#!/usr/bin/python from primitives import * from command_reader import * import re class NexusTaxLabelsReader: taxNamePattern = re.compile(r'.*[a-zA-Z0-9]+.*') def __init__(self, effectiveTaxaBlock, supportUnknownDimension = True): self.effTaxaBlock = effectiveTaxaBlock self.allowUnknownDimension = supportUnknownDimension def readCommand(self, taxLabelsToken, cStream, obj = None, blockObj = None): assert(blockObj is not None) self.effTaxaBlock = blockObj if 'taxa_mgr' in self.effTaxaBlock.__dict__: taxa_mgr = self.effTaxaBlock.taxa_mgr else: taxa_mgr = self.effTaxaBlock if self.effTaxaBlock and ('ntax' not in self.effTaxaBlock.__dict__ or self.effTaxaBlock.ntax < 1): if not self.allowUnknownDimension: msg = "The number of taxa must be specified (in a DIMENSIONS"\ "command) before taxlabels can be read." raise NexusError(msg, token=taxLabelsToken) nLabels = None else: nLabels = self.effTaxaBlock.ntax tok_stream = cStream.getTokenStream() lab = tok_stream.next() new_labels = [] while str(lab) != ';': if nLabels and len(new_labels) >= nLabels: msg = "Expecting a ; (Check that the number of taxa specified"\ " is correct)." raise NexusError(, token=lab) new_labels.append(str(lab)) lab = tok_stream.next() if nLabels and len(new_labels) < nLabels: msg = "Unexpected ; (Check that the number of taxa specified"\ " is correct).' raise NexusError(msg, token=lab) taxa_mgr.add_taxa(new_labels) return True class NexusTaxaBlock(NexusBlock, ContainedTaxaManager): cmdHandlers = [ NexusCommandReader('Dimensions', [NexusIntSubCommandReader('ntax', 0, NexusSubCommandReader.isPositiveNumber)]), NexusCommandReader('TaxLabels', readerToCreate = NexusTaxLabelsReader) ] def endBlockEncountered(self): sd = self.__dict__ if 'tax_labels' not in sd and 'ntax' in sd and 'taxa_mgr' not in sd : self.tax_labels = [str(i+1) for i in range(self.ntax)] def __init__(self, beginCmd = None, commandStream = None, previousBlocks = None, **kwds): """In kwds uses taxa_mgr if present, if not used tax_naming_style and tax_labels.""" self.taxa_mgr = get_taxa_manager_from_args(kwds) if commandStream is None: self.prepareToRead(previousBlocks or []) NexusBlock.__init__(self, beginCmd, commandStream, previousBlocks or []) def getTaxaFromNexus(inF, getTaxaFromAllPublic = True, **kwdsToTaxaBlock): """returns first taxa list from the stream.""" if getTaxaFromAllPublic: import public_blocks handlerDict = public_blocks.ALL_PUBLIC_BLOCKS public_blocks.addBlockInitializationArgs(handlerDict, ['TAXA'], kwdsToTaxaBlock) else: handlerDict = {'TAXA': (NexusTaxaBlock, kwdsToTaxaBlock) } for b in NexusBlockStream(inF, handlerDict, True, []): try: tl = b.get_tax_labels() if len(tl) > 0: return tl except AttributeError: pass return [] def getTaxaFromNexusString(s, getTaxaFromAllPublic = True, **kwdsToTaxaBlock): import cStringIO return getTaxaFromNexus(cStringIO.StringIO(s), getTaxaFromAllPublic, **kwdsToTaxaBlock) def getTaxaFromNexusFileName(inFilename, getTaxaFromAllPublic = True, **kwdsToTaxaBlock): NexusParsing.pushCurrentFile(inFilename) inF = open(inFilename, 'rU') tList = getTaxaFromNexusFileName(inF, getTaxaFromAllPublic, **kwdsToTaxaBlock) NexusParsing.popCurrentFile() return tList