#!/usr/bin/python """ """ import copy, sys from primitives import * class NexusParsing: statusStream = None fileStack = [] currentFile = '' def statusMessage(s): if NexusParsing.statusStream is not None: NexusParsing.statusStream.write(' ...%s' % s) NexusParsing.statusStream.flush() statusMessage = staticmethod(statusMessage) def pushCurrentFile(f): if NexusParsing.currentFile != '': NexusParsing.fileStack.append(NexusParsing.currentFile) currentFile = f pushCurrentFile = staticmethod(pushCurrentFile) def popCurrentFile(): NexusParsing.currentFile = len(NexusParsing.fileStack) > 0 and NexusParsing.fileStack.pop(-1) or '' popCurrentFile = staticmethod(popCurrentFile) class PosTriple: tabWidth = 4 def __init__(self, p, l, c): self.pos = p self.line = l self.column = c def next_char(self, c, prev): """returns length of previous line (if c is a newline)""" self.pos += 1 if c == '\n' or c =='\r': ret = self.column self.column = 0 if c == '\n' and prev == '\r': return -1 self.line += 1 return ret if c == '\t': self.column += PosTriple.tabWidth else: self.column += 1 return -1 def __str__(self): return 'pos=%d line=%d col=%d' % (self.pos, self.line, self.column) class NexusCommand: def __init__(self, nameToken, tokenStream): self.name = nameToken self.start_pos = nameToken.start_pos self.optionList = [] for t in tokenStream: if t == ';': self.end_pos = t.end_pos return self.optionList.append(t) raise NexusOpenCommandError(str(nameToken), token=nameToken) def __str__(self): s = ' '.join([str(o) for o in self.optionList]) return '%s\n\t%s;' % (self.name, s) class NexusCommandStream: def __init__(self, file_obj): self.nexusTokStream = NexusTokenStream(file_obj) self.firstCmd = True self.nextCommandName = None def next(self): cmdName = self.getNextCommandName() self.nextCommandName = None return NexusCommand(cmdName, self.nexusTokStream) def __iter__(self): while True: yield self.next() def getNextCommandName(self): if self.nextCommandName is None: self.nextCommandName = self.nexusTokStream.next() if self.firstCmd == True and self.nextCommandName == '#NEXUS': self.nextCommandName = self.nexusTokStream.next() self.firstCmd == False while self.nextCommandName == ';': self.nextCommandName = self.nexusTokStream.next() return self.nextCommandName def demandCommandEnd(self): self.nexusTokStream.require_next_token(';') self.nextCommandName = None def getTokenStream(self): """Asking for the token stream invalidates the nextCommandName field.""" self.nextCommandName = None return self.nexusTokStream def skipCommand(self): cmdName = self.getNextCommandName() self.nextCommandName = None for t in self.nexusTokStream: if t == ';': return class NexusBlock: cmdHandlers = [] def __init__(self, beginCmd = None, commandStream = None, previousBlocks = None): if beginCmd is not None: self.name = str(beginCmd.optionList[0]) self.start_pos = beginCmd.name.start_pos if commandStream is not None: self.prepareToRead(previousBlocks or []) self.parseBlock(commandStream) def prepareToRead(self, previousBlocks): """Called after the class of the block is determined. Hook for initialization of derived classes).""" pass def endBlockEncountered(self): pass def skipBlock(self, commandStream): self.commands = [] try: while True: nextName = commandStream.getNextCommandName() if str(nextName).upper() == 'END': c = commandStream.next() self.end_pos = c.end_pos break else: commandStream.skipCommand() except StopIteration: raise NexusOpenBlockError(self.name, start_pos=self.start_pos) def parseBlock(self, commandStream): self.commands = [] try: while True: nextName = commandStream.getNextCommandName() if str(nextName).upper() == 'END': c = commandStream.next() self.end_pos = c.end_pos self.endBlockEncountered() break self.interpretCommand(nextName, commandStream) except StopIteration: raise NexusOpenBlockError(self.name, start_pos=self.start_pos) def interpretCommand(self, nextName, commandStream): for ch in self.__class__.cmdHandlers: if ch.attemptRead(nextName, commandStream, self, self): return self.commands.append(commandStream.next()) def getCommandStream(self): return self.commandStream def getTokenStream(self): return self.commandStream.getTokenStream() def __str__(self): s = self.__dict__.has_key('commands') and '\n '.join([str(c) for c in self.commands]) or '' return 'BEGIN %s;\n %s\nEND;' % (self.name, s) class NexusBlockStream: def __init__(self, file_obj, blockHandlers = {}, skipUnknownBlocks = True, previousBlocks = None): self.blockHandlers = blockHandlers self.commandStream = NexusCommandStream(file_obj) self.skipUnknownBlocks = skipUnknownBlocks self.previousBlocks = previousBlocks or [] def next(self): handlerClass = None while handlerClass is None: beginCmd = self.commandStream.next() if beginCmd.name != 'BEGIN': raise NexusBareCommandError(beginCmd) if len(beginCmd.optionList) != 1: msg = "Expecting block name and then a semi colon after the "\ "BEGIN command" raise NexusError(msg, end_pos=beginCmd.name.end_pos) blockName = str(beginCmd.optionList[0]).upper() handlerTuple = self.blockHandlers.get(blockName) if handlerTuple is None: if self.skipUnknownBlocks: NexusParsing.statusMessage('%s Block is not currently supported. Skipping...' % blockName) self.skipBlock(self.commandStream) else: handlerClass = NexusBlock kwds = {} else: handlerClass= handlerTuple[0] kwds = handlerTuple[1] b = handlerClass(beginCmd, self.commandStream, copy.copy(self.previousBlocks), **kwds) self.previousBlocks.append(b) return b def skipBlock(self, commandStream): while self.commandStream.next().name != 'END': pass def __iter__(self): while True: yield self.next() def getCommandStream(self): return self.commandStream def getTokenStream(self): return self.commandStream.getTokenStream() def getNexusTokenObjects(s): return [i for i in NexusTokenStream(s)] def getNexusTokens(s): return [str(i) for i in NexusTokenStream(s)] if __name__ == '__main__': from cipres.util.io import initLogger initLogger() import doctest, sys doctest.testmod(sys.modules[__name__])