#!/usr/bin/python # Copyright (c) 2005 by Mark T. Holder, Florida State University. (see end of file) '''basic operations that involve multiple parts of PIPRes/CIPRES.''' #from PIPRes.util.client_import import * #from PIPRes.util.server_import import * from PIPRes.nexus.primitives import NexusToken, BadTreeDefError from PIPRes.utility import getBalancedString, chompIter from PIPRes.cipres_types import getDatatypeWithMappings, _translateRow, CipresDiscreteMatrix, CipresTree, newickListWithNamesToCipresTrees, newickWithNamesToCipresTree, dnaToCipresMatrix from PIPRes.discrete_datatype import toFastaDatatype from PIPRes.util.io import cipresGetLogger, logException, writeWrapped, getCipresTmpDir from PIPRes.util.cipres import getCipresProperties from PIPRes.wrap.idl import CipresIDL_api1, getTreeScore from PIPRes.util.cipres import getCipresRegistry import tempfile import os import string import re _LOG = cipresGetLogger('pipres.basic') _AFTER_TREE = re.compile(r'(:\d+(\.\d*([eE]-?\d+)?)?)?[;\s](.*)') class MatrixFormat: "Enum of matrix file formats" PHYLIP, RelaxedPHYLIP, NEXUS, FASTA = range(4) class FileFormatError(ValueError): '''Thrown for errors reading files.''' pass class TaxTreeCharTriple: '''Groups related taxa, characters, trees. Primarily a convenience class so that functions can get all data with one structure (so we do not need a convention like "send taxa as first arg, characters as second..." or rely on named args all the time. "matrixList" is a python list of DataMatrix objects (CipresDiscreteMatrix object, one per DATA or CHARACTERS block if the source is a NEXUS file) "treeList" is a python list of trees (CipresIDL_api1.Tree objects if read from Nexus) "taxa" is a list of the taxa names. >>> from PIPRes.basic import * ''' def __init__(self, taxaNames = (), trees = (), matrices = ()): self.taxa = list(taxaNames) self.treeList = list(trees) self.matrixList = list(matrices) def __nonzero__(self): '''Returns True if the instance has a non empty matrixList, treeList or taxa >>> t = TaxTreeCharTriple() >>> bool(t) False ''' return bool(len(self.taxa) or self.hasChars() or self.hasTrees()) def hasChars(self): "True if matrices are stored in matrixList" return len(self.matrixList) > 0 def hasTrees(self): "True if matrices are stored in treeList" return len(self.treeList) > 0 def __eq__(self, other): if self.taxa: if not self.taxa == other.taxa: return False else: if other.taxa: return False if self.treeList: if not self.treeList == other.treeList: return False else: if other.treeList: return False if self.matrixList: if not self.matrixList == other.matrixList: return False else: if other.matrixList: return False def readSimpleNexus(fileOrPath, returnTrees = True, returnMatrices = True, returnTaxa = True, registryWrapper = None): '''Returns a TaxTreeCharTriple object populated by reading the fileOrPath. Convenience function simply returns the first element from a getContentFromNexus() call. +++ Requires CIPRES ''' triple = getContentFromNexus(fileOrPath, returnTrees=returnTrees, returnMatrices=returnMatrices, returnTaxa=returnTaxa, registryWrapper = None)[0] triple.treeList = triple.treeList triple.matrixList = [CipresDiscreteMatrix(i) for i in triple.matrixList] return triple def readFilePathAsNexus(file): return getContentFromNexus(file, treatStringAsPath = True) def writeFilePathAsNexus(fp, listOfNexusBlobs): out = open(fp, "w") try: writeAsNexus(out, listOfNexusBlobs, appending=False) finally: out.close() def writeAsNexus(out, listOfNexusTriples, appending=True): """`listOfNexusBlobs` is a list that of TaxTreeCharTriple objects.""" if not appending: out.write("#NEXUS\n") for blob in listOfNexusTriples: write_tax = True taxa = blob.taxa for mat in blob.matrixList: writeNexusDataMatrix(out, mat, taxa, writeTaxaBlock=write_tax, writeAsCharBlock=True) write_tax=False if blob.treeList: writeTreesBlock(out, blob.treeList, taxa, writeTaxaBlock=write_tax) write_tax=False def getContentFromNexus(fileOrPath, **kwargs): '''Returns a list of TaxTreeCharTriple objects populated by reading the fileOrPath.''' returnTrees = kwargs.get('returnTrees', True) returnMatrices = kwargs.get('returnMatrices', True) returnTaxa = kwargs.get('returnTaxa', True) registryWrapper = kwargs.get('registryWrapper') treatStringAsPath = kwargs.get('treatStringAsPath', True) writeFileObjToFile = kwargs.get('writeFileObjToFile', True) tmpFileName = None if isinstance(fileOrPath, str): if treatStringAsPath: fileOrPath = os.path.abspath(fileOrPath) else: prop = getCipresProperties() if not (prop and prop.applicationSendNexusAsString): parDir = getCipresTmpDir() # tempfile.NamedTemporaryFile can't be read by another application on # modern windows, so we'll use mkstemp instead tmpFD, tmpFileName = tempfile.mkstemp(suffix = '.nex', dir=parDir) try: os.write(tmpFD, fileOrPath.read()) fileOrPath = tmpFileName os.close(tmpFD) except: os.remove(tmpFileName) raise else: treatStringAsPath = False fileOrPath = '\n'.join(fileOrPath.readlines()) registryWrapper = (registryWrapper is None) and getCipresRegistry() or registryWrapper content = [] nexusReader = registryWrapper.getServiceOrThrow(CipresIDL_api1.ReadNexus) try: kwargs['treatStringAsPath'] = treatStringAsPath content = getContentUsingNexusReader(fileOrPath, nexusReader, **kwargs) finally: nexusReader.remove() if tmpFileName is not None: os.remove(tmpFileName) return content def getTreeFromPhylipStream(f): '''Reads a newick tree from a file object. Returns the tree string and the remainder of the line (everything after the ; Expects that the first graphical character is a (. Strings are then concatenated until a matching ) is found. An exception is raised if the next character is not ; or whitespace The file object is advanced by lines''' fIter = iter(f) for line in fIter: stripped = line.lstrip() if len(stripped) > 0: if not stripped.startswith('('): raise ValueError, 'Expecting the string that starts with (, found %s' % str(stripped) newick, pref, remaining = getBalancedString(stripped, '(', ')', forbidden=';', sourceIter=chompIter(fIter)) if len(remaining) > 0: m = _AFTER_TREE.match(remaining) if not m: s = 'Expecting ; or whitespace after the tree representation (found %s)' % remaining[0] raise ValueError, s g = m.groups() if g[0]: newick = '%s%s' % (newick, g[0]) remaining = g[3] return '%s;' % newick, remaining raise ValueError, 'Did not find tree' def readTreeFromPhylipStream(f, **kwargs): '''Uses getTreeStringFromPhylipFile to get a tree string and returns a CipresTree.''' newick = getTreeFromPhylipStream(f)[0] return newickWithNamesToCipresTree(newick, **kwargs) def readTreeFromPhylipFile(fp, **kwargs): "Reads a newick tree from the file path (see getTreeFromPhylipStream). kwargs are passed to CipresTree.__init__" f = open(fp, 'rU') try: try: return readTreeFromPhylipStream(f, **kwargs) except BadTreeDefError: raise except ValueError, x: raise ValueError, 'Error in %s: %s' % (fp, x.args[0]) finally: f.close() def getContentUsingNexusReader(contentOrPath, nexusReader, **kwargs): '''Uses "nexusReader" to parse "contentOrPath" (the boolean treatStringAsPath argument determines how "contentOrPath" is interpretted). return args control what content from the file is converted to python objects.''' returnTrees = kwargs.get('returnTrees', True) returnMatrices = kwargs.get('returnMatrices', True) returnTaxa = kwargs.get('returnTaxa', True) treatStringAsPath = kwargs.get('treatStringAsPath', True) funcToCall = treatStringAsPath and nexusReader.readNexusFile or nexusReader.readStringAsNexus blocksRead = funcToCall(contentOrPath, -1) assert(len(blocksRead) >= 3) #should report taxa, trees, characters (in that order) at least _LOG.debug("Read %s blocks" % str(blocksRead)) taxa = [] matrices = [] trees = [] if returnTrees: nTreesBlocks = blocksRead[CipresIDL_api1.ReadNexus.NEXUS_TREES_BLOCK_INDEX] for i in range(nTreesBlocks): trees.extend(nexusReader.getTrees(i)) if returnMatrices: nCharsBlocks = blocksRead[CipresIDL_api1.ReadNexus.NEXUS_CHARACTERS_BLOCK_INDEX] matrices = [nexusReader.getCharacters(i) for i in range(nCharsBlocks)] nTaxaBlocks = blocksRead[CipresIDL_api1.ReadNexus.NEXUS_TAXA_BLOCK_INDEX] tmpTaxa = nTaxaBlocks > 0 and [nexusReader.getTaxa(i) for i in range(nTaxaBlocks)] or [[]] if nTaxaBlocks > 1: firstTaxaBlock = tmpTaxa[0] for i in range(1,nTaxaBlocks): if tmpTaxa[i] != firstTaxaBlock: raise FileFormatError, 'Multiple taxa blocks are not supported yet.' tmpTaxa = [firstTaxaBlock] taxa = returnTaxa and tmpTaxa or [] trip = TaxTreeCharTriple(taxa[0], trees, matrices) return [trip] def createDefaultNexusTaxaNames(defNtax, taxSlice = None): '''Returns taxLabels, taxSlice tuple. taxSlice, if sent, should be a iterable of 0-based indices of the taxa to create names for''' if taxSlice: taxLabels = {} for r in taxSlice: taxLabels[r] = 'tax' + str(r+1) else: taxSlice = xrange(defNtax) taxLabels = ['tax%d' % i for i in range(1, defNtax + 1)] return taxLabels, taxSlice def _getValidTaxLabelsAndSlice(nRows, taxLabels, rowSlice): if not taxLabels: return createDefaultNexusTaxaNames(nRows, rowSlice) ntax = len(taxLabels) if rowSlice: assert(ntax == len(rowSlice)) else: assert(ntax == nRows) rowSlice = xrange(ntax) return taxLabels, rowSlice def writeDataMatrix(matrix, matrixFile, taxLabels=[], rowSlice=None, colSlice=None, format=MatrixFormat.NEXUS): if format == MatrixFormat.NEXUS: writeNexusDataMatrix(matrixFile, matrix, taxLabels=taxLabels, rowSlice=rowSlice, colSlice=colSlice) elif format == MatrixFormat.PHYLIP or format == MatrixFormat.RelaxedPHYLIP: writePhylipMatrix(matrixFile, matrix, taxLabels=taxLabels, rowSlice=rowSlice, colSlice=colSlice) elif format == MatrixFormat.FASTA: writeFastaMatrix(matrixFile, matrix, taxLabels=taxLabels, rowSlice=rowSlice, colSlice=colSlice) else: raise ValueError, 'Unknown format code %d' % format def approxEqual(x, y, tol=1.0e-6): return abs(x - y) <= tol def cipresMatrixToRawStringMatrix(mat, taxLabels=[], rowSlice = None, colSlice = None): """Converts a Cipres data matrix to a matrix of strings encoding the same data.""" matrix = mat.m_matrix datatype = getDatatypeWithMappings(mat) nRows = len(matrix) taxLabels, rowSlice = _getValidTaxLabelsAndSlice(nRows, taxLabels, rowSlice) ntax = len(taxLabels) if ntax < 1: return nchar = colSlice and len(colSlice) or len(matrix[0]) if nchar < 1: return [] r = [] if colSlice: for row in rowSlice: rowToWrite = matrix[row] r.append(''.join([datatype.internalIndexToString[rowToWrite[c]] for c in colSlice])) else: for row in rowSlice: r.append(''.join(_translateRow(datatype, matrix[row]))) return r def _commonWriteOps(mat, taxLabels, rowSlice, colSlice): matrix = mat.m_matrix datatype = getDatatypeWithMappings(mat) nRows = len(matrix) taxLabels, rowSlice = _getValidTaxLabelsAndSlice(nRows, taxLabels, rowSlice) ntax = len(taxLabels) nchar = colSlice and len(colSlice) or len(matrix[0]) return matrix, datatype, taxLabels, rowSlice, ntax, nchar def writeFastaMatrix(out, mat, taxLabels=[], rowSlice = None, colSlice = None): matrix, datatype, taxLabels, rowSlice, ntax, nchar = _commonWriteOps(mat, taxLabels, rowSlice, colSlice) datatype = toFastaDatatype(datatype) if ntax < 1 or nchar < 1: return for row in rowSlice: out.write('>%s\n' % (taxLabels[row])) if colSlice: rowToWrite = matrix[row] s = ''.join([datatype.internalIndexToString[rowToWrite[c]] for c in colSlice]) else: s = ''.join(_translateRow(datatype, matrix[row])) writeWrapped(out, s, 80) out.write('\n') return out def writePhylipMatrix(out, mat, taxLabels=[], rowSlice = None, colSlice = None): matrix, datatype, taxLabels, rowSlice, ntax, nchar = _commonWriteOps(mat, taxLabels, rowSlice, colSlice) datatype = toFastaDatatype(datatype) if ntax < 1 or nchar < 1: return out.write('%d %d\n' % (ntax, nchar)) maxLabelLen = 10 formatStr = '%%-%ds' % maxLabelLen if colSlice: for row in rowSlice: out.write(formatStr % (taxLabels[row][:maxLabelLen])) rowToWrite = matrix[row] out.write(''.join([datatype.internalIndexToString[rowToWrite[c]] for c in colSlice])) out.write('\n') else: for row in rowSlice: out.write(formatStr % (taxLabels[row][:maxLabelLen])) out.write('%s\n' % ''.join(_translateRow(datatype, matrix[row]))) return out def nexusTokenizeSelectedWords(labels, selectionSlice): """Returns a dictionary that maps the elements of the iterable `selectionSlice` to NEXUS tokenized versions of the string value with the same key from `labels` """ tokenizedLabels = {} for i in selectionSlice: tokenizedLabels[i] = NexusToken.escapeString(str(labels[i])) return tokenizedLabels def writeNexusTaxaBlock(out, ntax, taxLabels, selectionSlice): """`selectionSlice` is the `rowSlice` of `_commonWriteOps`""" tokenizedTaxLabels = nexusTokenizeSelectedWords(taxLabels, selectionSlice) out.write("BEGIN TAXA;\n Dimensions ntax = %d;\n TaxLabels " % ntax) for row in selectionSlice: out.write(" %s" % tokenizedTaxLabels[row]) out.write(" ;\nEND;\n") return tokenizedTaxLabels def writeNexusDataMatrix(out, mat, taxLabels = [], rowSlice = None, colSlice = None, **kwargs): ''' rowSlice, and colSlice of None indicate that all rows and characters should be written''' matrix, datatype, taxLabels, rowSlice, ntax, nchar = _commonWriteOps(mat, taxLabels, rowSlice, colSlice) if ntax < 1 or nchar < 1: return block_name = "CHARACTERS" nt_str = "" if kwargs.get("writeTaxaBlock"): tokenizedTaxLabels = writeNexusTaxaBlock(out, ntax, taxLabels, rowSlice) elif not kwargs.get("writeAsCharBlock"): block_name = "DATA" nt_str = "ntax = %d " % ntax tokenizedTaxLabels = nexusTokenizeSelectedWords(taxLabels, rowSlice) out.write('BEGIN %s;\n\tDimensions %snchar = %d;\n\tFormat datatype = %s ' % (block_name, nt_str, nchar, datatype.getNexusType())) if datatype.m_datatype == CipresIDL_api1.GENERIC_DATATYPE: out.write('symbols = "%s" ' % datatype.m_symbols[:datatype.m_numStates]) out.write('gap = -;\n\tmatrix\n') maxLabelLen = max([len(i) for i in tokenizedTaxLabels.itervalues()]) formatStr = '%%-%ds ' % maxLabelLen if colSlice: for row in rowSlice: out.write(formatStr % (tokenizedTaxLabels[row])) rowToWrite = matrix[row] out.write(''.join([datatype.internalIndexToString[rowToWrite[c]] for c in colSlice])) out.write('\n') else: for row in rowSlice: out.write(formatStr % (tokenizedTaxLabels[row])) out.write('%s\n' % ''.join(_translateRow(datatype, matrix[row]))) out.write(';\nEND;\n') return out def writeTrees(out, trees, taxLabels = [], taxSlice = None, numTax = None): '''taxSlice (if not None) is the set of taxa to include (as 0-based indices). If taxLabels = [] and numTax is None then the trees will be search to find the maximum index. ''' pruning = bool(taxSlice) if len(trees) < 1 or pruning and len(taxSlice) < 2: return ntaxUsed = None if not taxLabels: ntaxUsed = numTax and numTax or 1 + max([t.getMaxTaxonIndex() for t in trees]) taxLabels, taxSlice = createDefaultNexusTaxaNames(ntaxUsed, taxSlice) else: if not taxSlice: taxSlice = xrange(len(taxLabels)) if pruning: assert(false) # unimplemented, doh. else: for tree in trees: out.write('%s;\n' % (tree.getNewick(newTaxLabels=taxLabels))) return out def writeTreesBlock(out, trees, taxLabels = [], taxSlice = None, numTax = None, comments="", **kwargs): '''taxSlice (if not None) is the set of taxa to include (as 0-based indices). If taxLabels = [] and numTax is None then the trees will be search to find the maximum index. ''' pruning = bool(taxSlice) if len(trees) < 1 or pruning and len(taxSlice) < 2: return ntaxUsed = None if taxLabels: if not taxSlice: taxSlice = xrange(len(taxLabels)) else: ntaxUsed = numTax and numTax or 1 + max([t.getMaxTaxonIndex() for t in trees]) taxLabels, taxSlice = createDefaultNexusTaxaNames(ntaxUsed, taxSlice) if kwargs.get("writeTaxaBlock"): writeNexusTaxaBlock(out, ntaxUsed, taxLabels, taxSlice) out.write('BEGIN TREES;\n') if comments: if comments[0] == "[" and comments[-1] == "]": out.write("%s\n" % comments) else: out.write("[%s]\n" % comments) tsIter = iter(taxSlice) i = tsIter.next() out.write('\tTranslate\n\t\t%d %s' % (i+1, NexusToken.escapeString(taxLabels[i]))) for i in tsIter: out.write(',\n\t\t%d %s' % (i+1, NexusToken.escapeString(taxLabels[i]))) out.write(';\n') #out.write('\t\t;\n') for n, tree in enumerate(trees): if pruning: assert(false) # unimplemented, doh. else: tName = NexusToken.escapeString(tree.name and tree.name or 'tree' + str(n + 1)) try: sc_str = str(getTreeScore(tree)) tName = "%s [score=%s]" % (tName, sc_str) except: pass # scores are optional, so we will just suppress any errors. out.write('TREE %s = [&%c] %s\n' % (tName, tree.rooted and 'R' or 'U', tree.m_newick)) #m_newick has the ; out.write('END;\n') return out if __name__ == '__main__': import doctest doctest.testmod() # This file is part of the PIPRes library # # The PIPRes library is free software; you can redistribute it # and/or modify it under the terms of the GNU Lesser General # Public License as published by the Free Software Foundation; # either version 2.1 of the License, or (at your option) any later # version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free # Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, # MA 02111-1307, USA