#!/usr/bin/python # Copyright (c) 2005 by Mark T. Holder, Florida State University. (see end of file) """Handles the reading of NEXUS Characters of Data blocks.""" from cipres.temp_seqs import * from primitives import * from command_reader import * from cipres.util.io import * from sets import Set import taxa_block from arbol.nexus import find_nexus_label from cipres.util.io import cipresGetLogger _LOG = cipresGetLogger('ciprespy.nexus.characters_block') def getCharacterMatricesFromNexus(inF, getTaxaFromAllPublic = True, **kwdsToCharsBlock): matrixList = [] if getTaxaFromAllPublic: import public_blocks handlerDict = copy.copy(public_blocks.ALL_PUBLIC_BLOCKS) public_blocks.addBlockInitializationArgs(handlerDict, ['CHARACTERS', 'DATA'], kwdsToCharsBlock) else: handlerDict = { 'CHARACTERS': (NexusCharactersBlock, kwdsToCharsBlock), 'DATA': (NexusDataBlock, kwdsToCharsBlock) } for b in NexusBlockStream(inF, handlerDict, True, []): if isinstance(b, NexusCharactersBlock) or isinstance(b, NexusDataBlock): matrixList.extend(b.matrix) return matrixList def getCharacterMatricesFromNexusString(s, getTaxaFromAllPublic = True, **kwdsToCharsBlock): import cStringIO return getCharacterMatricesFromNexus(cStringIO.StringIO(s), getTaxaFromAllPublic, kwdsToCharsBlock) def getCharacterMatricesFromNexusFileName(inFilename, getTaxaFromAllPublic = True, **kwdsToCharsBlock): return openInPathAndCall(inFilename, getCharacterMatricesFromNexus, **kwdsToCharsBlock) class NexusMatrixReader: def __init__(self, containingBlock): self.taxa_mgr = containingBlock def verifyBlockField(blockObj, s, tok, v = None): if not blockObj.__dict__.has_key(s): msg = 'FORMAT command must precede Matrix (%s field missing)' % s raise NexusError(msg, token=tok) if (v is not None) and blockObj.__dict__[s] != v: msg = 'Currently FORMAT %s must equal %s' % (s, str(v)) raise NexusError(msg, token=tok) return True verifyBlockField = staticmethod(verifyBlockField) def translateSeqList(sList, cDict, missingCode): strForm = [] for c in sList: strForm.append(cDict.get(c, missingCode)) return ''.join(strForm) translateSeqList = staticmethod(translateSeqList) def readCommand(self, cName, cStream, obj, blockObj): NexusMatrixReader.verifyBlockField(blockObj, 'interleave', cName, False) NexusMatrixReader.verifyBlockField(blockObj, 'labels', cName) NexusMatrixReader.verifyBlockField(blockObj, 'symbolDict', cName) NexusMatrixReader.verifyBlockField(blockObj, 'symbols', cName) NexusMatrixReader.verifyBlockField(blockObj, 'codeToSymbolDict', cName) NexusMatrixReader.verifyBlockField(blockObj, 'nchar', cName) NexusMatrixReader.verifyBlockField(blockObj, 'ntax', cName) self.readNonInterleavedMatrix(cStream.getTokenStream(), obj, blockObj) cStream.demandCommandEnd() return True def readDataCell(self, tok_stream, blockObj): c = tok_stream.next_char() try: return blockObj.symbolDict[c] except KeyError: if c == '(': msg = "Polymorphic data cells are not supported yet" raise NexusError(msg, token=tok_stream.token) if c == '{': n = 0 try: while True: c = tok_stream.next_char() if c == '}': break n += blockObj.symbolDict[c] return n except KeyError:pass msg = "Unexpected character: %s" c raise NexusError(msg, token=tok_stream.token) def read_n_chars(self, n, tok_stream, blockObj): nc = [] for nCharRead in range(n): try: nc.append(self.readDataCell(tok_stream, blockObj)) except NexusError, n: n.message = n.message + ' (at character number %d)' % (nCharRead+1) raise n if tok_stream.more_chars_in_token(): c = tok_stream.next_char() eMsg = 'Expecting the end of the character array, found %s' % c raise NexusError(eMsg, token=tok_stream.token) return nc def readNonInterleavedMatrix(self, tok_stream, obj, blockObj): addLabels = len(blockObj.get_tax_labels()) == 0 dataForTaxInd = ['' for i in range(blockObj.ntax)] taxIndSet = Set() for nTaxRead in range(blockObj.ntax): nameTok = tok_stream.next() if addLabels: blockObj.add_taxa([nameTok]) taxInd = nTaxRead assert(taxInd < len(blockObj.get_tax_labels())) else: taxInd = find_nexus_label(nameTok, blockObj.get_tax_labels(), 'taxon') if taxInd in taxIndSet: msg = ''.join(['Data for taxon number ', str(taxInd + 1), " (name = ", self.taxa_mgr[taxInd].upper(), ") have already been read" ]) raise NexusError(msg, token=nameTok) taxIndSet.add(taxInd) NexusParsing.statusMessage('Reading data for %s...\n' % blockObj.get_tax_labels()[taxInd]) dataForTaxInd[taxInd] = self.read_n_chars(blockObj.nchar, tok_stream, blockObj) if 0: #now well create a biomatrix of bioseq object if blockObj.dataType != 'DNA' and blockObj.dataType != 'PROTEIN': return True from Bio import Alphabet from Bio.Alphabet import IUPAC if blockObj.dataType == 'DNA': alfa = Alphabet.Gapped(IUPAC.ambiguous_dna) missingIUPAC = 'N' elif blockObj.dataType == 'PROTEIN': NexusParsing.statusMessage('Converting protein sequence to IUPAC code all sites with any ambiguity will become "X".\n') alfa = Alphabet.Gapped(IUPAC.protein) missingIUPAC = 'X' bmat = [] from Bio.Seq import Seq NexusParsing.statusMessage('Converting to IUPAC code all missing sites will become %s.\n' % missingIUPAC) for nTaxRead in range(blockObj.ntax): NexusParsing.statusMessage('Translating data for %s to Bio.Seq() ...\n' % self.taxa_mgr[taxInd]) s = NexusMatrixReader.translateSeqList(dataForTaxInd[nTaxRead], blockObj.codeToSymbolDict, missingIUPAC) bmat.append(Seq(s, alfa)) obj.bioMatrix = bmat if blockObj.dataType == 'DNA': obj.matrix = [DNASeq(i) for i in dataForTaxInd] elif blockObj.dataType == 'PROTEIN': obj.matrix = [AASeq(i) for i in dataForTaxInd] else: obj.matrix = dataForTaxInd NexusParsing.statusMessage('Data matrix read...\n') return True class EquateSubCommandReader(NexusUnsupportedSubCommand): def __init__(self): super(EquateSubCommandReader, self).__init__('Equate', '') def verifyNotInSymbols(c, sym, name, tok): if (c is not None) and sym.count(c) > 0: msg = 'The "%s" character (%s) may not be '\ 'included in the SYMBOLS list' % (name, c) raise NexusError(msg, token=tok) return True class NexusCharactersBlock(NexusBlock, ContainedTaxaManager): cmdHandlers = [] illegalSpecialChars = """()[]{}/\\,;:=*'"`<>^""" def isValidGapChar(subCmd, c, tok, obj): if len(c) > 1 or len(c.strip()) == 0 or NexusCharactersBlock.illegalSpecialChars.find(c) != -1: NexusError('%s is not a valid GAP character' % c, token=tok) return True isValidGapChar = staticmethod(isValidGapChar) def isValidMatchChar(subCmd, c, tok, obj): if len(c) > 1 or len(c.strip()) == 0 or NexusCharactersBlock.illegalSpecialChars.find(c) != -1: NexusError('%s is not a valid MATCH character' % c, token=tok) return True isValidMatchChar = staticmethod(isValidMatchChar) def isValidSymbolsList(subCmd, c, tok, obj): s =''.join(c.split()) s = list(Set([i for i in s])) s.sort() obj['Symbols'] = ''.join(s) return True isValidSymbolsList = staticmethod(isValidSymbolsList) def isValidMissingChar(subCmd, c, tok, obj): if len(c) > 1 or len(c.strip()) == 0 or NexusCharactersBlock.illegalSpecialChars.find(c) != -1: NexusError('%s is not a valid MISSING character' % c, token=tok) return True isValidMissingChar = staticmethod(isValidMissingChar) def __init__(self, beginCmd = None, commandStream = None, previousBlocks = None, **kwds): self.taxa_mgr = get_taxa_manager_from_args(kwds) if commandStream is None: self.prepareToRead(previousBlocks or []) NexusBlock.__init__(self, beginCmd, commandStream, previousBlocks) def formatCommandIsValid(self, cmd, obj, tok): _npb_identityTrans = string.maketrans('', '') pref = 'The FORMAT Command\'s' matchChar = obj.get('MatchChar') gap = obj.get('Gap') missing = obj.get('Missing') eMsg = "" if matchChar is not None: if matchChar == gap: eMsg = "MatchChar cannot equal the gap char" elif matchChar == missing: eMsg = "MatchChar cannot equal the missing char" elif gap is not None and gap_char == missing: eMsg = "Gap char cannot equal the missing char" if eMsg: raise NexusError(eMsg, token=tok) dt = obj['DataType'].upper() self.dataType = dt sym = obj['Symbols'] respectCase = obj['RespectCase'] assert sym is not None equates = obj.get('Equate') if equates is None: equates = [] # key might exist in dict, but still be at the default None if dt == 'CONTINUOUS': msg = 'CONTINUOUS datatype is not supported yet' raise NexusError(msg, token=tok) if dt == 'NUCLEOTIDE': sym = sym.translate(_npb_identityTrans, 'Uu') # U is dealt with using an equate equates.insert(0, ('U', 'T')) if dt == 'STANDARD': native = sym == '' and '01' or '' nativeEquates = [] elif dt == 'DNA' or dt == 'Nucleotide': native = DNASeq.symbols[:-1] # we don't always use the - nativeEquates = DNASeq.equates elif dt == 'RNA': native = 'ACGU' nativeEquates = NexusCharactersBlock.rnaEquates elif dt == 'PROTEIN': native = AASeq.symbols[:-1] # we don't always add the - nativeEquates = AASeq.equates nativeEquates.extend(equates) equates = nativeEquates verifyNotInSymbols(gap, native, 'Gap', tok) verifyNotInSymbols(missing, native, 'Missing', tok) verifyNotInSymbols(matchChar, native, 'MatchChar', tok) if gap is not None: native = native + gap if len(sym) > 0: self.dataType = 'STANDARD' # once you introduce new symbols you are in no-man's land in terms of data type msg = 'User defined symbols are not supported yet' raise NexusError(msg, token=tok) sym = sym.translate(_npb_identityTrans, native) if not respectCase: sym = sym.translate(_npb_identityTrans, native.lower()) sym = native + sym self.symbols = sym self.gap = gap self.missing = missing self.matchChar = matchChar self.equates = equates self.respectCase = respectCase constructNexusSymbolsTranslation(self, self.respectCase, self.missing, self.dataType != 'PROTEIN', tok) obj['Symbols'] = sym if self.matchChar is not None: raise NexusError('MatchChar is not supported yet.', token=tok) return True formatCommandIsValid = staticmethod(formatCommandIsValid) class NexusDataBlock(NexusCharactersBlock): def writeNexusBlock(self, out): tax_labels = self.get_tax_labels() ntax = len(tax_labels) assert(ntax == self.ntax and ntax == len(self.matrix)) nchar = self.nchar assert(nchar == len(self.matrix[0])) out.write('BEGIN DATA;\n\tDimensions ntax = %d nchar = %d;\n\tFormat datatype = %s gap = -;\n\tmatrix\n' % (ntax, nchar, self.dataType)) tokenizedTaxLabels = [NexusStr.escape(i) for i in tax_labels] maxLabelLen = len(tokenizedTaxLabels[0]) for i in range(1,ntax): maxLabelLen = max(maxLabelLen, len(tokenizedTaxLabels[i])) formatStr = '%%-%ds %%s\n' % maxLabelLen for i in range(0, ntax): out.write(formatStr % (tokenizedTaxLabels[i], self.matrix[i])) out.write(';\nEND;\n') def initializeCharDataBlock(): if len(NexusCharactersBlock.cmdHandlers) == 0: NexusCharactersBlock.dimensionsCommand = NexusCommandReader('Dimensions', [ NexusBoolSubCommandReader('NewTaxa', False), NexusIntSubCommandReader('ntax', 0, NexusSubCommandReader.isPositiveNumber), NexusIntSubCommandReader('nchar', 0, NexusSubCommandReader.isPositiveNumber)]) NexusCharactersBlock.formatCommand = NexusCommandReader('Format', [ NexusChoiceSubCommandReader('DataType', ['Standard', 'DNA', 'RNA', 'Nucleotide', 'Protein', 'Continuous'], 0, None), NexusChoiceSubCommandReader('Items', ['Min', 'Max','Median', 'Average', 'Var', 'SampleSize', 'States'], -1, None), NexusChoiceSubCommandReader('StateFormat', ['StatesPresent', 'Individuals','Count', 'Frequency'], -1, None), NexusBoolSubCommandReader('Tokens', False), NexusBoolSubCommandReader('RespectCase', False), NexusBoolSubCommandReader('Transpose', False), NexusBoolSubCommandReader('interleave', False), NexusBoolSubCommandReader('labels', True), NexusStringSubCommandReader('Gap', None, NexusCharactersBlock.isValidGapChar), NexusStringSubCommandReader('MatchChar', None, NexusCharactersBlock.isValidMatchChar), NexusStringSubCommandReader('Symbols', '', NexusCharactersBlock.isValidSymbolsList), EquateSubCommandReader(), NexusStringSubCommandReader('Missing', '?', NexusCharactersBlock.isValidMissingChar),], NexusCharactersBlock.formatCommandIsValid) NexusCharactersBlock.eliminateCommand = NexusUnsupportedCommand('Eliminate') NexusCharactersBlock.taxLabelsCommand = NexusCommandReader('TaxLabels', readerToCreate = taxa_block.NexusTaxLabelsReader) NexusCharactersBlock.charStateLabelsCommand = NexusUnsupportedCommand('CharStateLabels') NexusCharactersBlock.charLabelsCommand = NexusUnsupportedCommand('CharLabels') NexusCharactersBlock.stateLabelsCommand = NexusUnsupportedCommand('StateLabels') NexusCharactersBlock.matrixCommand = NexusCommandReader('Matrix', readerToCreate = NexusMatrixReader) NexusCharactersBlock.cmdHandlers = [ NexusCharactersBlock.dimensionsCommand, NexusCharactersBlock.formatCommand, NexusCharactersBlock.eliminateCommand, NexusCharactersBlock.taxLabelsCommand, NexusCharactersBlock.charStateLabelsCommand, NexusCharactersBlock.charLabelsCommand, NexusCharactersBlock.stateLabelsCommand, NexusCharactersBlock.matrixCommand ] NexusDataBlock.dimensionsCommand = NexusCommandReader('Dimensions', [NexusIntSubCommandReader('ntax', 0, NexusSubCommandReader.isPositiveNumber), NexusIntSubCommandReader('nchar', 0, NexusSubCommandReader.isPositiveNumber)]) NexusDataBlock.cmdHandlers = [NexusDataBlock.dimensionsCommand] NexusDataBlock.cmdHandlers.extend(NexusCharactersBlock.cmdHandlers[1:]) t_to_u = string.maketrans('tT', 'uU') NexusCharactersBlock.rnaEquates = [(i[0].translate(t_to_u, ''), i[1].translate(t_to_u, '')) for i in DNASeq.equates] initializeCharDataBlock()