#!/usr/bin/python '''Currently implements tree searching via a file-based wrapper around PAUP4.0b10''' import os import time from threading import Event, Thread, Lock from PIPRes.util.server_import import * from PIPRes.cipres_types import * from PIPRes.wrap.nexus_program_server import * from PIPRes.wrap.output_parser import * from PIPRes.service_impl.split_freq_supplier import SplitFreqSupplier from PIPRes.util.cipres import getCipresProperty from PIPRes.splits import splitToNewick from PIPRes.cipres_types import * from PIPRes.util.io import cipresGetLogger from PIPRes.event_consumer import EventSupplier, LineReadingThread _LOG = cipresGetLogger('pipres.service_impl.paup_wrap.py') class ConstraintType: DONT_ENFORCE, MONOPHYLETIC, BACKBONE, CONVEX, NOT_MONOPHYLETIC, NOT_BACKBONE, NOT_CONVEX = range(7) toPaupType = ['', 'MONOPHYLY', 'BACKBONE', 'CONVEX', 'MONOPHYLY', 'BACKBONE, CONVEX'] def isConverse(t): return t >= ConstraintType.NOT_MONOPHYLETIC isConverse = staticmethod(isConverse) class PaupDStatusReadingThread(LineReadingThread): """A thread that will read the input stream and call an function with the tree strings as they appear. `toCall` passing it (tree, score, model_str) as they appear when run is is called. The thread terminates if: - "end;.*" is encountered as a line. - the stop Event is set NOTE: this assumes that GARLI always writes the taxa in normal (ascending) order so that no translation needs to be done to get to the implied NEXUS numbering of taxa. """ # header and status line for searches that involve one replicate one_rep_stat_line_re = re.compile(r"\s*(\d+:\d+:\d+)\s+([-0-9]+)\s+([-0-9]+)\s+([-0-9]+)\s+([-0-9]+)\s+([ -.0-9]+)\s*") one_rep_pre_re = re.compile(r"\s+time\s+added\s+tried\s+saved\s+left-to-swap\s+tree\(s\)") # header and status line for searches that involve multiple replicates multi_rep_stat_line_re = re.compile(r"\s*(\d+:\d+:\d+)\s+([-0-9]+)\s+([-0-9])+\s+([-0-9]+)\s+([-0-9]+)\s+([-0-9]+)\s+([ -.0-9]+)\s+([ -.0-9]+)\s*") multi_rep_pre_re = re.compile(r"\s+time\s+rep\s+added\s+tried\s+saved\s+left-to-swap\s+this\s+rep\s+overall\s*") # patterns for the start and end of a dstatus table starting_re = re.compile(r"\s*\-{10,}\s*") ending_re = re.compile(r".*completed\s*") def __init__(self, toCall, filename, stop_event=None, READ_SLEEP_INTERVAL=0.5): """`toCall` is the callable that takes a (raw) tree string, score and model and returns a False value to terminate the reading. see LineReadingThread for other args """ self.toCall = toCall if not callable(self.toCall): raise ValueError("toCall arg must be callable") self.started = False self.ready_to_start = False self.one_rep = False LineReadingThread.__init__(self, lineCallback=None, stream=None, filename=filename, stopEvent=stop_event, READ_SLEEP_INTERVAL=READ_SLEEP_INTERVAL) def keep_going(self, line): if self.started: if self.one_rep: m = PaupDStatusReadingThread.one_rep_stat_line_re.match(line) if m: e_time, tax_add, rearr, n_tre, swap_left, score = m.groups() if rearr == "-" or rearr == "0": msg = "Constructing starting tree. Adding taxon %s." % tax_add else: msg = "Current score = %s. %s trees found. %s left to "\ "swap. %s rearrangements tried." % (score, n_tre, swap_left, rearr) msg = msg + " Elapsed time = %s" % e_time return self.toCall(msg) else: m = PaupDStatusReadingThread.multi_rep_stat_line_re.match(line) if m: e_time, rep, tax_add, rearr, n_tre, swap_left, rep_sc, score = m.groups() if score == "-": msg = "Constructing starting tree. Search replicate %s."\ " Adding taxon %s." % (rep, tax_add) else: msg = "Current score = %s. Search replicate %s. "\ "%s trees found. %s left to swap. "\ "%s Rearrangements tried." % (score, rep, n_tre, swap_left, rearr) msg = msg + " Elapsed time = %s" % e_time return self.toCall(msg) if PaupDStatusReadingThread.ending_re.match(line): self.started = False elif self.ready_to_start: if PaupDStatusReadingThread.starting_re.match(line): self.started = True else: self.ready_to_start = False elif PaupDStatusReadingThread.one_rep_pre_re.match(line): self.one_rep = True self.ready_to_start = True elif PaupDStatusReadingThread.multi_rep_pre_re.match(line): self.one_rep = False self.ready_to_start = True return True class PaupWrap(object, CipresIDL_api1__POA.TreeImprove, CipresIDL_api1__POA.TreeInfer, CipresIDL_api1__POA.TreeEvaluate, SimpleNexusWrapperServer): debugging = True def __init__(self, registry, paupPath): self.debugging = PaupWrap.debugging SimpleNexusWrapperServer.__init__(self, registry) self.convertTreeFunc = self.orb is None and toTreeBridge or toIDLTree self.registry = registry self.mat = None self.tree = None self.nexusReader = None settings = "set MaxTrees=10 NoIncrease ;" self.bootSearchDefault = settings + ' Default HSearch nomultrees swap=spr ' self.bootDefTemplate = settings + ' Default Boot nreps = %d cutoffpct=0 grpfreq=yes ' self.searchRNGSeeder = None self.bootRNGSeeder = None self.bootFreqFile = 'bootfreqs.log' self.nBootReps = 100 self.treeImproveDefault = settings + 'Default HSearch nomultrees swap=spr ' self.treeImproveDefault = self.treeImproveDefault + ' dstatus=30 ' self.treeInferDefault = settings + 'Default HSearch nomultrees swap=spr start=stepwise addseq=random' self.treeInferDefault = self.treeInferDefault + ' dstatus=30 ' self.treeEvaluateDefault = '' self.futurePaup = False # in future releases paup will support searching on partial leaf sets # if we aren't using future paup, we need to translate the taxon labels if not os.path.exists(paupPath): raise ValueError, 'The specified path to PAUP (%s) does not exist.' % paupPath self.paupPath = paupPath self.constraints = [] self.criterion = OptimalityCriterion.PARSIMONY self.model = None self.taxLabels = None self.taxManager = None self._toCallerTranslator = None self.translationObjLock = Lock() # lock to assure single thread access to translation data structs self.statusEventLock = Lock() # lock to assure single thread access to the event_supplier that posts self.event_supplier = None # self._statusEventConsumer = None # consumer that the event_supplier communicates with self.tempTreesToEventsThread = None # thread for sending tree events self._termEvent = None # threading.Event object that is used by the EventsThreads to check if GARLI is still running. def setCriterion(self, crit): if isinstance(crit, str): self._criterion = OptimalityCriterion.asStr.index(crit) else: OptimalityCriterion.asStr[crit] # raises IndexError if invalid self._criterion = crit def getCriterion(self): return self._criterion criterion = property(getCriterion, setCriterion) def getSearchSeedFragment(self): if self.searchRNGSeeder is None: return '' return ' rseed = %d ' % self.searchRNGSeeder.externalSeed() def getBootSeedFragment(self): if self.bootRNGSeeder is None: return '' return ' bseed = %d ' % self.bootRNGSeeder.externalSeed() def setConstraint(self, split, typeOfConstraint = ConstraintType.MONOPHYLETIC): if typeOfConstraint == ConstraintType.DONT_ENFORCE: if split is None: if self.constraints: self.constraints[0] = ConstraintType.DONT_ENFORCE return self.constraints = [typeOfConstraint, None] newick = None if isinstance(split, long) or isinstance(split, int): if self.taxLabels is not None: newick = splitToNewick(split, taxLabels=self.taxLabels) else: assert(self.mat is not None) newick = splitToNewick(split, nTaxa=len(self.mat), starting=1) elif isinstance(split, str): newick = split.strip() if newick.startswith('.') or newick.startswith('*'): newick = splitRepToSplit(newick) else: if not isinstance(split, Tree): raise TypeError, 'split must be long, newick string, or tree' if self.taxManager is not None: self.constraints[1] = CipresTree(newick, taxaManager=self.taxManager) else: self.constraints[1] = numberedLeafTree(newick) def setSearchSeed(self, seed): if self.searchRNGSeeder is None: self.searchRNGSeeder = RNGSeeder() self.searchRNGSeeder.seed(seed) def setBootSeed(self, seed): if self.bootRNGSeeder is None: self.bootRNGSeeder = RNGSeeder() self.bootRNGSeeder.seed(seed) def getBootDefaults(self): bootCmd = self.bootDefTemplate % self.nBootReps return bootCmd + self.getBootSeedFragment() def getBootSearchDefaults(self): return self.bootSearchDefault + self.getSearchSeedFragment() def touch(self): self.splitFreq = {} def setMatrix(self, mat): _LOG.debug('setMatrix called\n') # _LOG.debug('matrix = %s\n' % mat) self.mat = mat self.touch() def setTree(self, tree): _LOG.debug('setTree called. Converting arg to CipresTree\n') self.tree = CipresTree(tree) _LOG.debug('setTree called with arg:\n') _LOG.debug(self.tree.m_newick) self.touch() def _calcSplitsIfEmpty(self): if len(self.splitFreq) == 0: if not self.mat: raise RuntimeError, 'Failed to call setMatrix before requesting split frequency' self.bootstrapToGetSplits() def getSplitsTable(self): self._calcSplitsIfEmpty() return self.splitFreq def readPScores(self, scoreFile): pscores = readPaupPScores(scoreFile) _LOG.debug('pscores read from %s' % scoreFile) return pscores def readLScores(self, scoreFile, model, longFmt = True, demandHeader = True): lscores = readPaupLScores(scoreFile, model, demandHeader=demandHeader, longFmt=longFmt) _LOG.debug('lscores read from %s' % scoreFile) return lscores def readDScores(self, scoreFile): dscores = readPaupDScores(scoreFile) _LOG.debug('dscores read from %s' % scoreFile) return dscores def readTrees(self, treeFile): try: self.nexusReaderLock.acquire() except: sys.exit('YIKES') try: if self.exiting: return nilTree() if self.nexusReader is None: _LOG.debug('getting ReadNexus object ref') self.nexusReader = self.registry.getService(CipresIDL_api1.ReadNexus) if self.nexusReader is None: raise RuntimeError, 'Could not acquire a ReadNexus servant' try: self.nexusReader.readNexusFile(treeFile, CipresIDL_api1.ReadNexus.NEXUS_TREES_BLOCK_BIT) return self.nexusReader.getTrees(0) finally: #TEMP pass#nr, self.nexusReader = self.nexusReader, None #nr.remove() finally: self.nexusReaderLock.release() def _paupHSearch(self, startingTree = None, configCmd = ''): return self._paupAnalysis(startingTree, configCmd, searchCmd = 'HSearch') def _embedInPaupBlock(self, s): 'accepts a string or list of strings (each of which is a command). adds paup block and semicolons.''' b = [ 'BEGIN PAUP', 'log start file=data.log append', ] if isinstance(s, str): b.append(s) else: b.extend(s) b.extend([ 'log stop', 'quit', 'END;']) return ';\t\n'.join(b) def _getScoreCmdPrefix(self, criterion): if criterion == OptimalityCriterion.PARSIMONY: return 'Pscore /' if criterion == OptimalityCriterion.LIKELIHOOD: return 'Lscore /LongFmt = yes ' return 'dscore /' def getInitializationCmd(self): '''Returns a command to be placed at the beginning of a paup block to set up the env. Currently this just sets the optimality criterion.''' settings = 'Set criterion = %s noerrorbeep nokeybeep noquerybeep ; ' % OptimalityCriterion.asStr[self.criterion] constraints = self.getConstraintsCmds() if constraints: return '%s;%s' % (settings, constraints) return settings def getScoreCmd(self, filename): prefix = self._getScoreCmdPrefix(self.criterion) suffix = filename and ("scorefile = \'%s\' " % filename ) or '' return '%s %s' % (prefix, suffix) def getTerminatedEvent(self): if self._termEvent is None: self._termEvent = Event() return self._termEvent def setStatusEventConsumer(self, proxyConsumer): """Sets the new event consumer and returns the old one. In current usage within CIPRES this will return None, but the caller should restore the old event consumer, when done with a proxyConsumer to handle future developments. """ sec = self._statusEventConsumer self._statusEventConsumer = proxyConsumer self.statusEventLock.acquire() try: if self._statusEventConsumer is not None: self.event_supplier = EventSupplier(self._statusEventConsumer) else: if self.event_supplier is not None: self.event_supplier.remove() self.event_supplier = None finally: self.statusEventLock.release() return sec def notify_caller(self, msg): self.statusEventLock.acquire() try: if self.event_supplier is None: return False self.event_supplier.send_str(msg) finally: self.statusEventLock.release() return True def _paupAnalysis(self, startingTree, configCmd, searchCmd = 'HSearch'): startTreeCmd = (searchCmd.upper() == 'HSEARCH') and (startingTree is not None) and 'start = 1' or '' extInvContext = self._createTempDataFile( matrix=self.mat, tree=startingTree, supportsTaxaSubsets=self.futurePaup, dirPref='paupWrap') extraPaths = [os.path.splitext(extInvContext.filename)[0] + os.path.extsep + 'log'] termEvent = self.getTerminatedEvent() try: scoreFile, treeFile = ('score', 'tree') cmdList = [ self.getInitializationCmd(), configCmd, searchCmd + ' ' + startTreeCmd, 'sorttrees', self.getScoreCmd(scoreFile), "savetrees file = \'%s\' TaxaBlk BrLens = yes" % treeFile ] extInvContext.fileObj.write(self._embedInPaupBlock(cmdList)) extInvContext.fileObj.close() if self.event_supplier is not None: log_file_path = os.path.join(extInvContext.tempDir, "data.log") tree_th = PaupDStatusReadingThread( toCall=self.notify_caller, filename=log_file_path, stop_event=termEvent) self.tempTreesToEventsThread = tree_th self.tempTreesToEventsThread.start() red_fn = "stdouterr" extraPaths.append(red_fn) out_redirect_fp = os.path.join(extInvContext.tempDir, red_fn) out_redirect = open(out_redirect_fp, "w") self._callProcess(extInvContext.tempDir, [self.paupPath, '-n', str(extInvContext.filename)], out_redirection=out_redirect) _LOG.debug("PAUP* finished") termEvent.set() # this tells the output reading threads to die. self._termEvent = None scoreFile = os.path.join(extInvContext.tempDir, scoreFile) treeFile = os.path.join(extInvContext.tempDir, treeFile) extraPaths.append(scoreFile) extraPaths.append(treeFile) if not os.path.exists(scoreFile): if self.event_supplier is not None: if os.path.exists(log_file_path): c = open(log_file_path, "rU").read() self.notify_caller("PAUP* LOG (with any error messages) follows:\n\n%s" % c) else: self.notify_caller("PAUP* failed to return a tree or leave an error log") return [] if self.criterion == OptimalityCriterion.PARSIMONY: scores = self.readPScores(scoreFile) elif self.criterion == OptimalityCriterion.LIKELIHOOD: # readLScores return [[lnL, model], [lnL, model]...] # here we pull out just the score. scores = [i[0] for i in self.readLScores(scoreFile, self.model, longFmt = True, demandHeader = True)] else: scores = self.readDScores(scoreFile) treesToReturn = self.readTrees(treeFile) finally: if not self.debugging: self.cleanupTemporaries(extInvContext.tempDir, [extInvContext.filename] + extraPaths) else: _LOG.debug('leaving the temporary directory %s (running in debug mode)' % extInvContext.tempDir) extTrans = not self.futurePaup and extInvContext.fromExternalTranslation or None nTrees = len(treesToReturn) scoredTrees = [addScoreAndMapLeaves(treesToReturn[i], scores[i], self.criterion, extTrans) for i in range(nTrees)] return [self.convertTreeFunc(t) for t in scoredTrees] def evaluateTree(self, proxyConsumer): prevSEC = self.setStatusEventConsumer(proxyConsumer) try: try: _LOG.debug('evaluateTree called') if not (self.mat and self.tree): raise RuntimeError, 'Failed to call setTree and setMatrix before evaluateTree' return self._paupAnalysis(self.tree, self.treeEvaluateDefault + ' ; ' + self.userCommands(), searchCmd = '')[0] except: logException(_LOG) return nilTree() finally: self.setStatusEventConsumer(prevSEC) def improveTree(self, proxyConsumer): _LOG.debug('improveTree called') treeList = self.improveTreeReturnAll(proxyConsumer) if len(treeList) < 1: return nilTree() return treeList[0] def inferTree(self, proxyConsumer): _LOG.debug('inferTree called') treeList = self.inferTreeReturnAll(proxyConsumer) if len(treeList) < 1: return nilTree() return treeList[0] def improveTreeReturnAll(self, proxyConsumer): prevSEC = self.setStatusEventConsumer(proxyConsumer) try: try: _LOG.debug('in improveTreeReturnAll') if not (self.mat and self.tree): raise RuntimeError, 'Failed to call setTree and setMatrix before improveTree' return self._paupHSearch(self.tree, self.treeImproveDefault + ' ; ' + self.userCommands()) except: logException(_LOG) return [] finally: self.setStatusEventConsumer(prevSEC) def inferTreeReturnAll(self, proxyConsumer): prevSEC = self.setStatusEventConsumer(proxyConsumer) try: try: _LOG.debug('inferTree called') if not self.mat: raise RuntimeError, 'Failed to call setMatrix before inferTree' return self._paupHSearch(None, self.treeInferDefault + ' ; ' + self.userCommands()) except: logException(_LOG) return nilTree() finally: self.setStatusEventConsumer(prevSEC) def bootstrapToGetSplits(self): extInvContext = self._createTempDataFile( matrix=self.mat, tree=None, supportsTaxaSubsets=self.futurePaup, dirPref='paupWrap') log_fn = os.path.splitext(extInvContext.filename)[0] + os.path.extsep + 'log' extraPaths = [log_fn, self.bootFreqFile] try: cmdList = [ self.getInitializationCmd(), self.getBootSearchDefaults(), 'log stop', 'log file=%s start replace' % self.bootFreqFile, self.getBootDefaults(), self.userCommands(), 'boot', 'log stop', 'log file = data.log append start', ] extInvContext.fileObj.write(self._embedInPaupBlock(cmdList)) extInvContext.fileObj.close() self._callProcess(extInvContext.tempDir, [self.paupPath, '-n', str(extInvContext.filename)] ) # commandline or extInvContext.filename limit in paup means that we have to chdir and use relative paths splitsFile = os.path.join(extInvContext.tempDir, self.bootFreqFile) try: self.splitFreq = {} self.splitFreq = readPAUPLogForSplitFreqs(splitsFile) except SplitsNotFoundError: pass finally: if not self.debugging: self.cleanupTemporaries(extInvContext.tempDir, [extInvContext.filename] + extraPaths) else: _LOG.debug('leaving the temporary directory %s (running in debug mode)' % extInvContext.tempDir) def enforceConstraint(self, constraint, demandSplits = True): '''Adds a split of tree to the constaints. if "demandSplits" is false, then a converse constraint is used. Only affects searching (not tree evaluation)''' self.constraints = [] def getConstraintsCmds(self): if (not self.constraints) or self.constraints[0] == ConstraintType.DONT_ENFORCE: return '' t = self.constraints[0] conCmd = 'constraints theConstraint (%s) = %s' % (ConstraintType.toPaupType[t], self.constraints[1].getNewick()) searchCmd = 'default hsearch enforce constraints=theConstraint converse=%s' % (ConstraintType.isConverse(t) and 'yes' or 'no') return '%s;%s' %(conCmd, searchCmd) def _getPathToPaupFromArgv(argv): PaupWrap.debugging = '-debug' in argv if len(argv) < 2: _LOG.error(' '.join(argv)) raise ValueError, 'Expecting the path to Paup as an argument' return argv[1] def createPaupWrap(registry, argv): return PaupWrap(registry, _getPathToPaupFromArgv([0] + argv)) # we add an empty element to act like the script name in a sys.argv list def defaultPaupWrap(): '''Returns a MBTwoExpWrap object with a getCipresRegistry, and path to mbTwoExp from the environment.''' return PaupWrap(getCipresRegistry(), getCipresProperty('paupPath')) if __name__=='__main__': argv = sys.argv paupPath = _getPathToPaupFromArgv(argv) def PaupWrapFactory(registry, pathToPaup = paupPath): return PaupWrap(registry, pathToPaup) try: interfaceDict = {} for k in ['TreeEvaluate', 'TreeImprove', 'TreeInfer']: interfaceDict[k] = PaupWrapFactory cipresServe(argv, interfaceDict) except: logException(_LOG)