#!/usr/bin/python ''' ''' import copy, cStringIO, sys from primitives import * class NexusParsing: statusStream = None fileStack = [] currentFile = '' def statusMessage(s): if NexusParsing.statusStream is not None: NexusParsing.statusStream.write(' ...%s' % s) NexusParsing.statusStream.flush() statusMessage = staticmethod(statusMessage) def pushCurrentFile(f): if NexusParsing.currentFile != '': NexusParsing.fileStack.append(NexusParsing.currentFile) currentFile = f pushCurrentFile = staticmethod(pushCurrentFile) def popCurrentFile(): NexusParsing.currentFile = len(NexusParsing.fileStack) > 0 and NexusParsing.fileStack.pop(-1) or '' popCurrentFile = staticmethod(popCurrentFile) class PosTriple: tabWidth = 4 def __init__(self, p, l, c): self.pos = p self.line = l self.column = c def nextChar(self, c, prev): '''returns length of previous line (if c is a newline)''' self.pos += 1 if c == '\n' or c =='\r': ret = self.column self.column = 0 if c == '\n' and prev == '\r': return -1 self.line += 1 return ret if c == '\t': self.column += PosTriple.tabWidth else: self.column += 1 return -1 def __str__(self): return 'pos=%d line=%d col=%d' % (self.pos, self.line, self.column) class NexusCommand: def __init__(self, nameToken, tokenStream): self.name = nameToken self.startPos = nameToken.startPos self.optionList = [] for t in tokenStream: if t == ';': self.endPos = t.endPos return self.optionList.append(t) raise NexusOpenCommandError, NexusOpenCommandError(str(nameToken), nameToken.startPos) def __str__(self): s = ' '.join([str(o) for o in self.optionList]) return '%s\n\t%s;' % (self.name, s) class NexusCommandStream: def __init__(self, fileObj): self.nexusTokStream = NexusTokenStream(fileObj) self.firstCmd = True self.nextCommandName = None def next(self): cmdName = self.getNextCommandName() self.nextCommandName = None return NexusCommand(cmdName, self.nexusTokStream) def __iter__(self): while True: yield self.next() def getNextCommandName(self): if self.nextCommandName is None: self.nextCommandName = self.nexusTokStream.next() if self.firstCmd == True and self.nextCommandName == '#NEXUS': self.nextCommandName = self.nexusTokStream.next() self.firstCmd == False while self.nextCommandName == ';': self.nextCommandName = self.nexusTokStream.next() return self.nextCommandName def demandCommandEnd(self): self.nexusTokStream.demandToken(';') self.nextCommandName = None def getTokenStream(self): '''Asking for the token stream invalidates the nextCommandName field.''' self.nextCommandName = None return self.nexusTokStream def skipCommand(self): cmdName = self.getNextCommandName() self.nextCommandName = None for t in self.nexusTokStream: if t == ';': return class NexusBlock: cmdHandlers = [] def __init__(self, beginCmd = None, commandStream = None, previousBlocks = None): if beginCmd is not None: self.name = str(beginCmd.optionList[0]) self.startPos = beginCmd.name.startPos if commandStream is not None: self.prepareToRead(previousBlocks or []) self.parseBlock(commandStream) def prepareToRead(self, previousBlocks): '''Called after the class of the block is determined. Hook for initialization of derived classes).''' pass def endBlockEncountered(self): pass def skipBlock(self, commandStream): self.commands = [] try: while True: nextName = commandStream.getNextCommandName() if str(nextName).upper() == 'END': c = commandStream.next() self.endPos = c.endPos break else: commandStream.skipCommand() except StopIteration: raise NexusOpenBlockError, NexusOpenBlockError(self.name, self.startPos) def parseBlock(self, commandStream): self.commands = [] try: while True: nextName = commandStream.getNextCommandName() if str(nextName).upper() == 'END': c = commandStream.next() self.endPos = c.endPos self.endBlockEncountered() break self.interpretCommand(nextName, commandStream) except StopIteration: raise NexusOpenBlockError, NexusOpenBlockError(self.name, self.startPos) def interpretCommand(self, nextName, commandStream): for ch in self.__class__.cmdHandlers: if ch.attemptRead(nextName, commandStream, self, self): return self.commands.append(commandStream.next()) def getCommandStream(self): return self.commandStream def getTokenStream(self): return self.commandStream.getTokenStream() def __str__(self): s = self.__dict__.has_key('commands') and '\n '.join([str(c) for c in self.commands]) or '' return 'BEGIN %s;\n %s\nEND;' % (self.name, s) class NexusTokenStream: import re _mantissaPattern = re.compile(r'\A\d+\.?\d*[eE]?\d*\Z|\A\.\d+[eE]?\d*\Z') _fullFloatPattern = re.compile(r'\A-(\d+\.?\d*|\.\d+)[eE]-?\d+\Z|\A(\d+\.?\d*|\.\d+)[eE]-\d+\Z') def __init__(self, fileObj): self.charByCharToken = None self._nexusCharStream = NexusCharStream(fileObj) def next(self): return NexusToken(self._nexusCharStream) def nextChar(self): while self.charByCharToken is None or self.cbcIndex >= len(str(self.charByCharToken)): self.charByCharToken = self.next() self.cbcIndex = 0 c = str(self.charByCharToken)[self.cbcIndex] self.cbcIndex += 1 return c def cbcTok(self): return self.charByCharToken def cbcMoreCharacters(self): return (self.charByCharToken is not None) and self.cbcIndex < len(str(self.charByCharToken)) def demandToken(self, s): t = self.next() if str(t).upper() != s: raise NexusMissingTokenError(s, t) def readUntil(self, s): tList = [] t = self.next() while str(t).upper() != s: tList.append(t) t = self.next() return tList def __iter__(self): while True: yield self.next() def nextFloat(self): ''' ''' t = self.next() sForm = str(t) makeNegative = sForm == '-' if makeNegative: breaker = self._nexusCharStream.peek() if not (breaker.isdigit() or breaker == '.'): raise NexusMissingTokenError('a number', t) t = self.next() sForm = str(t) if not NexusTokenStream._mantissaPattern.match(sForm) and (makeNegative or sForm[0] != '-' or not NexusTokenStream._mantissaPattern.match(sForm[1:])): if NexusTokenStream._fullFloatPattern.match(sForm): return t raise NexusMissingTokenError('a number', t) if t.chars[-1].upper() == 'E': breaker = self._nexusCharStream.peek() if not (breaker.isdigit() or breaker == '-'): raise NexusMissingTokenError('a number', t) ex = self.nextInteger() t.chars = t.chars + ex.chars if makeNegative: t.chars = '-' + t.chars return t def nextInteger(self): ''' returns a NexusToken with chars holding an integer from the next token in the stream (or two tokens if the number is negative)''' t = self.next() sForm = str(t) makeNegative = sForm == '-' if makeNegative: if not self._nexusCharStream.peek().isdigit(): raise NexusMissingTokenError('an integer', t) t = self.next() sForm = str(t) if not sForm.isdigit() and (makeNegative or sForm[0] != '-' or not sForm[1:].isdigit()): raise NexusMissingTokenError('an integer', t) if makeNegative: t.chars = '-' + t.chars return t class NexusBlockStream: def __init__(self, fileObj, blockHandlers = {}, skipUnknownBlocks = True, previousBlocks = None): self.blockHandlers = blockHandlers self.commandStream = NexusCommandStream(fileObj) self.skipUnknownBlocks = skipUnknownBlocks self.previousBlocks = previousBlocks or [] def next(self): handlerClass = None while handlerClass is None: beginCmd = self.commandStream.next() if beginCmd.name != 'BEGIN': raise NexusBareCommandError, NexusBareCommandError(beginCmd) if len(beginCmd.optionList) != 1: raise NexusError, NexusError(beginCmd.name.endPos, None, 'Expecting block name and then a semi colon after the BEGIN command') blockName = str(beginCmd.optionList[0]).upper() handlerTuple = self.blockHandlers.get(blockName) if handlerTuple is None: if self.skipUnknownBlocks: NexusParsing.statusMessage('%s Block is not currently supported. Skipping...' % blockName) self.skipBlock(self.commandStream) else: handlerClass = NexusBlock kwds = {} else: handlerClass= handlerTuple[0] kwds = handlerTuple[1] b = handlerClass(beginCmd, self.commandStream, copy.copy(self.previousBlocks), **kwds) self.previousBlocks.append(b) return b def skipBlock(self, commandStream): while self.commandStream.next().name != 'END': pass def __iter__(self): while True: yield self.next() def getCommandStream(self): return self.commandStream def getTokenStream(self): return self.commandStream.getTokenStream() def iterNexusTokenizeStr(s): sStream = cStringIO.StringIO(str(s)) return NexusTokenStream(sStream) def getNexusTokenObjects(s): return [i for i in iterNexusTokenizeStr(s)] def getNexusTokens(s): return [str(i) for i in iterNexusTokenizeStr(s)] if __name__ == '__main__': from PIPRes.util.io import initLogger initLogger() import doctest, sys doctest.testmod(sys.modules[__name__])