#!/usr/bin/python ''' Supplies commonly used classes (utility and exceptions) and functions used through nexus parsing code (may be imported as *). ''' import copy, cStringIO from PIPRes.util.io import cipresGetLogger _LOG = cipresGetLogger('pipres.nexus.primitives') class TaxaNamingEnum: '''Enumeration of naming styles: validLabels disallows all-numeric labels, acceptNumber allows numbers to be used as labels numbersOnly expects only integers which are interpretted as 1 + indices of the taxa indicesOnly expects only integers which are interpretted as the indices of the taxa labelsOrIndices tolerates labels and interprets all-numeric names as indices''' validLabels, acceptNumbers, numbersOnly, indicesOnly, labelsOrNumbers = range(5) def createTaxaManager(namingStyle = TaxaNamingEnum.acceptNumbers, taxLabels = [], taxSets = None): if namingStyle == TaxaNamingEnum.validLabels: return StrictTaxaManager(taxLabels, taxSets) if namingStyle == TaxaNamingEnum.acceptNumbers: return AllowNumbersTaxaManager(taxLabels, taxSets) if namingStyle == TaxaNamingEnum.numbersOnly or namingStyle == TaxaNamingEnum.indicesOnly: n = len(taxLabels) == 0 and -1 or reduce(lambda x, y: max(int(x), int(y)) - 1, taxLabels) o = (namingStyle == TaxaNamingEnum.numbersOnly) return RequireNumbersTaxaManager(n, taxSets, oneBasedIndexing=o) if namingStyle == TaxaNamingEnum.labelsOrNumbers: return LabelsOrNumbersTaxaManager(taxLabels, taxSets) raise ValueError, 'Unknown taxon naming style (%s)' % namingStyle def getTaxaManagerFromDictArgs(kwargs): '''Called by objects that need to reference or create a new taxa manager using standard keyword argument names kwargs uses taxaManager if present. if not used taxaNamingStyle and taxLabels''' taxaManager = kwargs.get('taxaManager') if taxaManager is None: return createTaxaManager(kwargs.get('taxaNamingStyle', TaxaNamingEnum.acceptNumbers), taxLabels=kwargs.get('taxLabels', [])) return taxaManager class NexusTaxaValidationMode: validateAsNumberOrInt, validateAsNexusLabel = range(2) class NexusTaxaManager(object): def isValidNexusLabelOrInteger(s): return len(s) > 1 or '\'~!@#$%^&*(){}[]-_+=?/":;,.><|` \t\n'.count(s) == 0 isValidNexusLabelOrInteger = staticmethod(isValidNexusLabelOrInteger) def isValidNexusLabel(s): return NexusTaxaManager.isValidNexusLabelOrInteger(s) and not s.isdigit() isValidNexusLabel = staticmethod(isValidNexusLabel) maxAnonymousNumericLabelAllowed = 10000000L def __init__(self, taxLabels = [], taxSets = None, labelValidatorMode = NexusTaxaValidationMode.validateAsNumberOrInt): self.taxLabels = [] self._validationMode = labelValidatorMode assert self._validationMode in [NexusTaxaValidationMode.validateAsNumberOrInt, NexusTaxaValidationMode.validateAsNexusLabel] if len(taxLabels) > 0: self.addTaxa(taxLabels) if taxSets: self.taxSets = taxSets def isDuplicateLabel(newLabel, prevLabels): nUpper = str(newLabel).upper() return len(filter(lambda x: x.upper() == nUpper, prevLabels)) > 0 isDuplicateLabel = staticmethod(isDuplicateLabel) def validateTaxonName(self, newLabel, prevLabels = []): if NexusTaxaManager.isDuplicateLabel(newLabel, prevLabels): raise NexusIllegalName('Taxon', 'The taxon label %s was repeated.' % newLabel) if self._validationMode == NexusTaxaValidationMode.validateAsNexusLabel: v = NexusTaxaManager.isValidNexusLabel(newLabel) elif self._validationMode == NexusTaxaValidationMode.validateAsNumberOrInt: v = NexusTaxaManager.isValidNexusLabelOrInteger(newLabel) else: raise AssertionError, 'Invalid taxon validation mode (%s)' % str(self._validationMode) if not v: raise NexusIllegalName('Taxon', 'The taxon label "%s" is illegal.' % newLabel) return True def addTaxa(self, newLabels): if self.taxaAreFinal() and len(newLabels) > 0: raise AssertionError, 'Adding taxa to finalized taxa manager.' for labelObj in newLabels: label = str(labelObj) self.validateTaxonName(label, self.taxLabels) #_LOG.warn('NexusTaxaManager.Adding %s' % label) self.taxLabels.append(label) def taxaAreFinal(self): '''returns true if 'finalize' has been called to lock the TaxaManager''' return self.__dict__.get('taxaFinalized', False) def finalizeTaxa(self): self.taxaFinalized = True def translateTaxLabel(self, tLabel): '''Checks for label in taxaManager field, then taxLabels, then taxSets (if those fields exist)''' tl = self.getTaxLabels() try: return findNexusIndex(tLabel, tl, 'taxon') except NexusAfterTokenError: pass labStr = str(tLabel) if self.__dict__.has_key('taxSets') and self.taxSets.has_key(labStr): return self.taxSets[labStr] raise NexusUnknownTaxonError, (tLabel, tl) def getNTax(self): return len(self.getTaxLabels()) def getTaxLabel(self, ind): #_LOG.warn('looking for element %d in [%s]' %(ind, ', '.join(self.getTaxLabels()))) return self.getTaxLabels()[ind] def getTaxLabels(self): return self.taxLabels def __len__(self): return self.getNTax() def __getitem__(self, item): if isinstance(item, slice): indices = item.indices(self.getNTax()) return [self.getTaxLabel(i) for i in range(*indices)] return self.getTaxLabel(item) def __iter__(self): for l in self.getTaxLabels(): yield l nTax = property(getNTax) class StrictTaxaManager(NexusTaxaManager): '''disallows all-numeric labels.''' def __init__(self, taxLabels = [], taxSets = None): NexusTaxaManager.__init__(self, taxLabels, taxSets, labelValidatorMode = NexusTaxaValidationMode.validateAsNexusLabel) class AllowNumbersTaxaManager(NexusTaxaManager): '''Allows numbers as labels (but they are NOT interpreted as the taxon's number in the list)''' def __init__(self, taxLabels = [], taxSets = None): NexusTaxaManager.__init__(self, taxLabels, taxSets, labelValidatorMode = NexusTaxaValidationMode.validateAsNumberOrInt) class LabelsOrNumbersTaxaManager(NexusTaxaManager): '''Allows numbers as labels (but they are NOT interpreted as the taxon's number in the list)''' def __init__(self, taxLabels, taxSets = None): self.maxIndex = -1 self.taxLabels = [] NexusTaxaManager.__init__(self, taxLabels, taxSets) def validateTaxonName(self, newLabel, prevLabels = []): if label.isdigit(): intLabel = int(label) -1 if intLabel < 0 or intLabel >= NexusTaxaManager.maxAnonymousNumericLabelAllowed: raise NexusIllegalName('Taxon', 'Numeric taxon label is out of range (1, %ld).' % NexusTaxaManager.maxAnonymousNumericLabelAllowed) else: return NexusTaxaManager.isDuplicateLabel(newLabel, prevLabels) return True def addTaxa(self, newLabels): if self.taxaAreFinal() and len(newLabels) > 0: raise AssertionError, 'Adding taxa to finalized taxa manager.' for label in newLabels: if label.isdigit(): intLabel = int(label) -1 if intLabel < 0 or intLabel >= NexusTaxaManager.maxAnonymousNumericLabelAllowed: raise NexusIllegalName('Taxon', 'Numeric taxon label is out of range (1, %ld).' % NexusTaxaManager.maxAnonymousNumericLabelAllowed) if intLabel> self.maxIndex: self.maxIndex = intLabel else: if self.taxLabels.count(label) > 0: raise NexusIllegalName('Taxon', 'The taxon label %s was duplicated.' % label) self.taxLabels.append(label) nlabels = len(self.taxLabels) self.maxIndex = max(self.maxIndex, nlabels - 1) def translateTaxLabel(self, label): labStr = str(tLabel) if not labStr.isdigit(): try: return findNexusIndex(tLabel, self.taxLabels, 'taxon') except NexusAfterTokenError: pass if self.__dict__.has_key('taxSets') and self.taxSets.has_key(labStr): return self.taxSets[tLabel] else: intLabel = int(labStr) - 1 if intLabel >= 0 and intLabel <= self.maxIndex: return intLabel raise NexusUnknownTaxonError(tLabel, knownLabels) def getNTax(self): return self.maxIndex + 1 def getTaxLabel(self, ind): if ind < len(self.taxLabels): return self.taxLabels[ind] if ind > self.maxIndex: raise IndexError, 'taxon index out of range' return str(ind + 1) def getTaxLabels(self): if self.maxIndex < 0: return [] labels = copy.copy(self.taxLabels) labels.extend([str(1 + i) for i in range(len(self.taxLabels), self.maxIndex + 1)]) return labels class RequireNumbersTaxaManager(NexusTaxaManager): '''Only accepts numbers as names and interprets the number as (1 + the taxon's index)''' def __init__(self, maxIndex = -1, taxSets = None, oneBasedIndexing = True): self.firstIndex = oneBasedIndexing and 1 or 0 self.maxIndex = maxIndex NexusTaxaManager.__init__(self, [], taxSets) def validateTaxonName(self, label, prevLabels): if not label.isdigit(): raise NexusIllegalName('Taxon', 'Only numeric taxon labels are accepted (%s found).' % label) intLabel = int(label) - self.firstIndex if intLabel < 0 or intLabel >= NexusTaxaManager.maxAnonymousNumericLabelAllowed: raise NexusIllegalName('Taxon', 'Numeric taxon label is out of range (1, %ld).' % NexusTaxaManager.maxAnonymousNumericLabelAllowed) return True def addTaxa(self, newLabels): if self.taxaAreFinal() and len(newLabels) > 0: raise AssertionError, 'Adding taxa to finalized taxa manager.' for label in newLabels: if not label.isdigit(): raise NexusIllegalName('Taxon', 'Only numeric taxon labels are accepted (%s found).' % label) intLabel = int(label) - self.firstIndex if intLabel < 0 or intLabel >= NexusTaxaManager.maxAnonymousNumericLabelAllowed: raise NexusIllegalName('Taxon', 'Numeric taxon label is out of range (1, %ld).' % NexusTaxaManager.maxAnonymousNumericLabelAllowed) if intLabel> self.maxIndex: self.maxIndex = intLabel def translateTaxLabel(self, tLabel): labStr = str(tLabel) if not labStr.isdigit(): if self.__dict__.has_key('taxSets') and self.taxSets.has_key(tLabel): return self.taxSets[labStr] else: intLabel = int(labStr) - self.firstIndex if intLabel >= 0 and intLabel <= self.maxIndex: return intLabel raise NexusUnknownTaxonError(tLabel, self.getTaxLabels()) def getNTax(self): return self.maxIndex + 1 def getTaxLabel(self, ind): if ind > self.maxIndex: raise IndexError, 'taxon index (%d) out of range' % ind return str(ind + self.firstIndex) def getTaxLabels(self): if self.maxIndex < 0: return [] return [str(self.firstIndex + i) for i in range(self.maxIndex + 1)] def __contains__(self, i): s = str(i) if s.isdigit(): ind = int(s) - self.firstIndex return (ind >= 0 and ind <= self.maxIndex) return False def __ior__(self, other): self.maxIndex = max(self.maxIndex, other.maxIndex) return self class ContainedTaxaManager(NexusTaxaManager): '''Useful for delegating calls to a taxaManager data member (classes can be derived from ContainedTaxaManger and not worry about delegating each call specifically many objects have access to the taxaManager and might want to act as a proxy NexusTaxaManager).''' def taxaAreFinal(self): return self.taxaManager.taxaAreFinal() def validateTaxonName(self, label, prevLabels): return self.taxaManager.validateTaxonName(label, prevLabels) def finalizeTaxa(self): self.taxaManager.finalizeTaxa() def translateTaxLabel(self, tLabel): return self.taxaManager.translateTaxLabel(tLabel) def getNTax(self): return self.taxaManager.getNTax() def getTaxLabel(self, ind): return self.taxaManager.getTaxLabel(ind) def getTaxLabels(self): tl = self.taxaManager.getTaxLabels() #_LOG.warn('ContainedTaxaManager.getTaxLabels %s' % str(tl)) return tl def addTaxa(self, newLabels): #_LOG.warn('ContainedTaxaManager.addTaxa %s' % str(newLabels)) return self.taxaManager.addTaxa(newLabels) class NexusError(ValueError): def __init__(self, s = None, e = None, m = ''): self.startPos = s self.endPos = e self.message = m def __str__(self): if self.startPos is None: posInfo = 'position unknown' else: if (self.endPos is None) or self.endPos == self.startPos: s = (self.endPos is None) and 'starting ' or '' posInfo = '%sat line %s' % (s, self.startPos) else: posInfo = 'from line %s to line %s' % (self.startPos, self.endPos) return 'Nexus Error: %s (%s)' % (self.message, posInfo) class CharPosStream: '''raw class that iterates through a file object returning the character and PosTriple for the characters location''' def __init__(self, fileObj): #fullPosSupport self.currPos = PosTriple(0, 1, 0) self.currPos = 1 #fullPosSupport self.inputStream = fileObj self.prev = '' def readNextChar(self): c = self.inputStream.read(1) if c == '': raise StopIteration return c def __iter__(self): while True: c = self.readNextChar() #fullPosSupport self.currPos.nextChar(c, self.prev) if c == '\n' or c =='\r' and (c == '\r' or prev == '\n'): self.currPos += 1 #fullPosSupport self.prev = c yield c, self.currPos class NexusCharStream: '''Wraps CharPosStream to allow for: - translation of all line-endings (\r, \r\n, and \n) to \n (note: NexusCharStream.peek() may return \r) - peek capabilities ''' def __init__(self, fileObj): self.charIter = iter(CharPosStream(fileObj)) #fullPosSupport self.nextCharPos = PosTriple(0, 1, 0) self.nextCharPos = 1 #fullPosSupport self._advanceStoredNextChar() def _advanceStoredNextChar(self): try: self.nextChar, self.nextCharPos = self.charIter.next() except StopIteration: self.nextChar = '' def peek(self): return self.nextChar def __iter__(self): while True: yield self.next() def next(self): c, p = self.nextChar, copy.copy(self.nextCharPos) if c == '': raise StopIteration self._advanceStoredNextChar() if c == '\r': # returns \n not \r if self.nextChar == '\n': #only return on \n for dos endings self._advanceStoredNextChar() c = '\n' return c, p import string class NexusToken: tokenBreakers = r';\'()]{}/\\,:=*"`+<>-' singleTokenChars = r'(){}"]/\\,;:=*`+<>-' identityTrans = string.maketrans('', '') def __init__(self, nexusCharStream = None): self.comments = [] if nexusCharStream: c = '' while c.strip() =='': c, sPos = nexusCharStream.next() if c == '[': self.comments.append(self._skipComment(nexusCharStream)) c = '' self.startPos = sPos if c == "'": self._readRestOfSingleQuoted(nexusCharStream) elif NexusToken.singleTokenChars.find(c) == -1: self._readRestOfToken(c, nexusCharStream) else: self.chars = c self.endPos = sPos def _readRestOfSingleQuoted(self, nexusCharStream): tok = cStringIO.StringIO() try: while True: c, self.endPos = nexusCharStream.next() if c == "'": if nexusCharStream.peek() == "'": c, self.endPos = nexusCharStream.next() else: break tok.write(c) except StopIteration: raise NexusOpenQuoteError, NexusOpenQuoteError(self.startPos) self.chars = tok.getvalue() def _skipComment(self, nexusCharStream): cmt = cStringIO.StringIO() try: while True: c, self.endPos = nexusCharStream.next() if c == ']': break else: if c == '[': cmt.write('[%s]' % self._skipComment(nexusCharStream)) else: cmt.write(c) except StopIteration: raise NexusOpenCommentError, NexusOpenCommentError(self.startPos) return cmt.getvalue() def _readRestOfToken(self, c, nexusCharStream): sPos = self.startPos tok = cStringIO.StringIO() try: while True: if c == '[': self.comments.append(self._skipComment(nexusCharStream)) else: tc = c == '_' and ' ' or c tok.write(tc) self.endPos = sPos n = nexusCharStream.peek() if n.strip() =='' or NexusToken.tokenBreakers.find(n) != -1: break c, sPos = nexusCharStream.next() except StopIteration: pass self.chars = tok.getvalue() def __str__(self): return self.chars def debugStr(self): return '->%s<- (from %s to %s)' % (self.chars, self.startPos, str(self.endPos)) def __eq__(self, other): if other.__class__ == NexusToken: return (self.chars, self.startPos, str(self.endPos)) == (other.chars, other.startPos, str(other.endPos)) return str(self).upper() == str(other).upper() def __ne__(self, other):return not self == other def escapeString(s): withoutSpecial = s.translate(NexusToken.identityTrans, NexusToken.tokenBreakers + '_[') if withoutSpecial == s: wsSplit = s.split() if len(wsSplit) == 1: return s singleSpaceSplit = s.split(' ') for i in singleSpaceSplit: if len(i.split()) > 1: return "'%s'" % s return '_'.join(singleSpaceSplit) s = "''".join(s.split("'")) return "'%s'" % s escapeString = staticmethod(escapeString) def getSpecialComments(self): return [i for i in self.comments if (len(i)> 0 and (i[0] == '&' or i[0] == '!'))] class NexusAfterTokenError(NexusError): def __init__(self, tok, m): if isinstance(tok, NexusToken): sP = tok.__dict__.has_key('startPos') and tok.startPos or None eP = tok.__dict__.has_key('endPos') and tok.endPos or None else: sP, eP = None, None NexusError.__init__(self, sP, eP, m) class NexusOpenCommentError(NexusError): def __init__(self, startPos): NexusError.__init__(self, startPos, None, 'Unterminated comment') class NexusOpenQuoteError(NexusError): def __init__(self, startPos): NexusError.__init__(self, startPos, None, 'Unterminated quoted string') class NexusOpenCommandError(NexusError): def __init__(self, name, startPos): NexusError.__init__(self, startPos, None, 'Expecting ; to end the %s command' % name) class NexusOpenBlockError(NexusError): def __init__(self, name, startPos): NexusError.__init__(self, startPos, None, 'Expecting "END;" to end the %s block' % name) class NexusBareCommandError(NexusError): def __init__(self, c): NexusError.__init__(self, c.startPos, c.endPos, 'Expecting BEGIN command to start a new block (found %s)' % c.name) self.command = c class NexusUnexpectedTokenError(NexusError): def __init__(self, c, tok): NexusError.__init__(self, tok.startPos, tok.endPos, 'Unexpected "%s" in %s' % (str(tok), c)) class NexusMissingTokenError(NexusError): def __init__(self, expected, tok): NexusError.__init__(self, tok.startPos, tok.endPos, 'Expecting %s but found %s' % (expected, str(tok))) class NexusUnsupportedError(NexusError): def __init__(self, n, tok): NexusError.__init__(self, tok.startPos, tok.endPos, '%s is not currently supported' % (n)) class NexusIllegalName(NexusError): def __init__(self, labelType='', n='', tok = None): if tok and isinstance(tok, NexusToken): NexusError.__init__(self, tok.startPos, tok.endPos, '%s is not a legal %s label. %s' % (str(tok), labelType, n)) else: NexusError.__init__(self, None, None, 'Illegal %s label. %s' % (labelType, n)) class NexusUnknownLabelError(ValueError): def __init__(self, lab, type = '', knownLabels = None): m = 'Unknown %s label "%s"' % (type, lab) if len(knownLabels or []) > 0: m = m + ' (possible names = %s)' % str(knownLabels) ValueError.__init__(self, m) class NexusUnknownTaxonError(NexusUnknownLabelError): def __init__(self, lab, knownLabels = None): NexusUnknownLabelError.__init__(self, lab, 'Taxon', knownLabels) class BadTreeDefError(NexusAfterTokenError): def __init__(self, tok, m): NexusAfterTokenError.__init__(self, tok, 'Invalid tree definition: ' + m) def notEqualOrNexusError(obj, s1, s2, pref, tok): if obj.get(s1) is not None: if obj[s1] == obj.get(s2): raise NexusAfterTokenError(tok, '%s %s cannot equal %s' %(pref, s1, s2)) def index(item, searchList, op): i = 0 for el in searchList: if op(item, el): return i i += 1 raise ValueError, 'index(item, searchList, op): item is not in searchList' def findNexusIndex(labelTok, labels, type = ''): searchName = str(labelTok).upper() try: return index(searchName, labels, lambda c, u: c == u.upper()) except ValueError: if searchName.isdigit(): i = int(searchName) if i <= len(labels): return i - 1 #raise ValueError, '%s not in %s' %(searchName, ', '.join(labels)) raise NexusAfterTokenError(labelTok, '%s is not a known %s label' % (labelTok, type)) def stableUnique(seq, idfun = None): ''' returns list with unique elements from seq. from http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52560 and when you can't lose order..., Alex Martelli, 2001/10/13 unique() systematically loses order, but if items are hashable it's not hard to keep order intact - only eliminate "later" insertions of already-present items. In case items _aren't_ hashable, for a big enough N using their cPickle.dumps() might be worth it... that's easily generalized to "uniqueness within equivalence classes" - parameter function idfun must return hashable objects which are == for and only for items that are duplicates.''' if idfun is None: def idfun(x): return x seen = Set() result = [] for item in seq: marker = idfun(item) if not marker in seen: seen.add(marker) result.append(item) return result if __name__ == '__main__': from PIPRes.util.io import initLogger initLogger() import doctest, sys doctest.testmod(sys.modules[__name__])