#!/usr/bin/python from primitives import * from command_reader import * import re class NexusTaxLabelsReader: taxNamePattern = re.compile(r'.*[a-zA-Z0-9]+.*') def __init__(self, effectiveTaxaBlock, supportUnknownDimension = True): self.effTaxaBlock = effectiveTaxaBlock self.allowUnknownDimension = supportUnknownDimension def readCommand(self, taxLabelsToken, cStream, obj = None, blockObj = None): assert(blockObj is not None) self.effTaxaBlock = blockObj if 'taxaManager' in self.effTaxaBlock.__dict__: taxaManager = self.effTaxaBlock.taxaManager else: taxaManager = self.effTaxaBlock if self.effTaxaBlock and ('ntax' not in self.effTaxaBlock.__dict__ or self.effTaxaBlock.ntax < 1): if not self.allowUnknownDimension: raise NexusAfterTokenError(taxLabelsToken, 'The number of taxa must be specified (in a DIMENSIONS command) before taxlabels can be read.') nLabels = None else: nLabels = self.effTaxaBlock.ntax tokStream = cStream.getTokenStream() lab = tokStream.next() newLabels = [] while str(lab) != ';': if nLabels and len(newLabels) >= nLabels: raise NexusAfterTokenError(lab, 'Expecting a ; (Check that the number of taxa specified is correct).') newLabels.append(str(lab)) lab = tokStream.next() if nLabels and len(newLabels) < nLabels: raise NexusAfterTokenError(lab, 'Unexpected ; (Check that the number of taxa specified is correct).') taxaManager.addTaxa(newLabels) return True class NexusTaxaBlock(NexusBlock, ContainedTaxaManager): cmdHandlers = [ NexusCommandReader('Dimensions', [NexusIntSubCommandReader('ntax', 0, NexusSubCommandReader.isPositiveNumber)]), NexusCommandReader('TaxLabels', readerToCreate = NexusTaxLabelsReader) ] def endBlockEncountered(self): sd = self.__dict__ if 'taxLabels' not in sd and 'ntax' in sd and 'taxaManager' not in sd : self.taxLabels = [str(i+1) for i in range(self.ntax)] def __init__(self, beginCmd = None, commandStream = None, previousBlocks = None, **kwds): '''In kwds uses taxaManager if present, if not used taxaNamingStyle and taxLabels.''' self.taxaManager = getTaxaManagerFromDictArgs(kwds) if commandStream is None: self.prepareToRead(previousBlocks or []) NexusBlock.__init__(self, beginCmd, commandStream, previousBlocks or []) def getTaxaFromNexus(inF, getTaxaFromAllPublic = True, **kwdsToTaxaBlock): '''returns first taxa list from the stream.''' if getTaxaFromAllPublic: import public_blocks handlerDict = public_blocks.ALL_PUBLIC_BLOCKS public_blocks.addBlockInitializationArgs(handlerDict, ['TAXA'], kwdsToTaxaBlock) else: handlerDict = {'TAXA': (NexusTaxaBlock, kwdsToTaxaBlock) } for b in NexusBlockStream(inF, handlerDict, True, []): try: tl = b.getTaxLabels() if len(tl) > 0: return tl except AttributeError: pass return [] def getTaxaFromNexusString(s, getTaxaFromAllPublic = True, **kwdsToTaxaBlock): import cStringIO return getTaxaFromNexus(cStringIO.StringIO(s), getTaxaFromAllPublic, **kwdsToTaxaBlock) def getTaxaFromNexusFileName(inFilename, getTaxaFromAllPublic = True, **kwdsToTaxaBlock): NexusParsing.pushCurrentFile(inFilename) inF = open(inFilename, 'rU') tList = getTaxaFromNexusFileName(inF, getTaxaFromAllPublic, **kwdsToTaxaBlock) NexusParsing.popCurrentFile() return tList