#!/usr/bin/python """Currently implements tree searching via a file-based wrapper around GARLIv0.951""" import os import re import sys from threading import Lock, Thread, Event import time from PIPRes.util.server_import import * from PIPRes.wrap.external_program import * from PIPRes.util.cipres import getCipresProperty from PIPRes.util.io import cipresGetLogger from PIPRes.event_consumer import LineReadingThread, EventSupplier, LinesToEventsThread from PIPRes.cipres_types import nilTree _LOG = cipresGetLogger('garli_wrap.py') from cStringIO import StringIO _garliRMatPat =re.compile(r'.*[rR]\s*([^rfapiRFAPI]+)\s*([rfapiRFAPI].*)') _garliFreqPat = re.compile(r'.*[fF]\s*([^rfapiRFAPI]+)\s*([rfapiRFAPI].*)') _garliShapePat = re.compile(r'.*[aA]\s*([.0-9]+)\s*([rfapiRFAPI].*)') _garliPInvarPat = re.compile(r'.*[iIpP]\s*([.0-9]+)\s*([rfapiRFAPI].*)') class GarliModel: def __init__(self, garliFmt=None, **kwargs): """A nucleotide model description. valid kwargs: rMat, freqs, gammaShape, pInvar, garliFmt""" self.nStates = 4 if garliFmt: #ASK DZ m = _garliRMatPat.match(garliFmt) rMat = m and m.group(1).strip().split() or None m = _garliFreqPat.match(garliFmt) freqs = m and m.group(1).strip().split() or None m = _garliShapePat.match(garliFmt) self.gammaShape = not m and 0.5 or float(m.group(1)) m = _garliPInvarPat.match(garliFmt) self.pInvar = m and float(m.group(1)) or 0.0 else: rMat = kwargs.get('rMat') freqs = kwargs.get('freqs') self.gammaShape = kwargs.get('gammaShape', 0.5) self.pInvar = kwargs.get('pInvar', 0.0) rMat = rMat or [1.0, 5.0, 1.0, 1.0, 5.0] if len(rMat) == 6: gtRate = rMat[5] self.relRates = [i/gtRate for i in rates[:5]] else: assert(len(rMat) == 5) self.relRates = rMat freqs = freqs or [.25, .25, .25] notLastFreq = sum(freqs[:self.nStates - 1]) assert(len(freqs) > self.nStates - 2) self.freqs = freqs if len(freqs) == self.nStates: sf = notLastFreq + freqs[self.nStates - 1] assert(approxEqual(sf, 1.0)) else: assert(len(freqs) == self.nStates - 1) self.freqs.append(1.0 - notLastFreq) for f in self.freqs: assert(f >= 0.0) for r in self.relRates: assert(r >= 0.0) assert(self.pInvar >= 0.0 and self.pInvar <= 1.0) assert(self.gammaShape > 0) def __str__(self): return 'r %s b %s a %f p %f ' % ( ' '.join(map(str, self.relRates)), ' '.join(map(str, self.freqs)), self.gammaShape, self.pInvar, ) class GarliLatestTreeThread(LineReadingThread): """A thread that will read the input stream and call an function with the tree strings as they appear. `toCall` passing it (tree, score, model_str) as they appear when run is is called. The thread terminates if: - "end;.*" is encountered as a line. - the stop Event is set NOTE: this assumes that GARLI always writes the taxa in normal (ascending) order so that no translation needs to be done to get to the implied NEXUS numbering of taxa. """ _garliNewBestTreeRE = re.compile(r"\s*tree\s+(\S+)\s*=\s*\[..\]\s*\[([-0-9.]*)\s+mut=[^\]]*\]\[(.*)\]\s*(\S*);") def __init__(self, toCall, stream=None, filename="", stop_event=None, READ_SLEEP_INTERVAL=0.5): """`toCall` is the callable that takes a (raw) tree string, score and model and returns a False value to terminate the reading. see LineReadingThread for other args """ self.toCall = toCall if not callable(self.toCall): raise ValueError("toCall arg must be callable") LineReadingThread.__init__(self, lineCallback=None, stream=stream, filename=filename, stop_event=stop_event, READ_SLEEP_INTERVAL=READ_SLEEP_INTERVAL) def keep_going(self, line): m = GarliLatestTreeThread._garliNewBestTreeRE.match(line) if m: g = m.groups() score = float(g[1]) model = g[2].strip() tree = g[3] return self.toCall(tree, score, model) c = line.strip() if c.lower().startswith("end;"): return False return True class GarliWrap(object, CipresIDL_api1__POA.TreeImprove, CipresIDL_api1__POA.TreeInfer, SimpleExternalWrapperServer): general_opts_and_d = [ ("datafname", 'inf'), ("constraintfile", "none"), #no ui ("streefname", 'random'), ("ofprefix", 'outf'), #no ui ("randseed", -1), ("availablememory", 512), ("logevery", 1000), #no ui ("saveevery", 1000), #no ui ("refinestart", 1), ("outputeachbettertopology", 0), # no ui ("enforcetermconditions", 1), ("genthreshfortopoterm", 10000), ("scorethreshforterm", 0.05), ("significanttopochange", 0.01), ("outputphyliptree", 0), #no ui ("outputmostlyuselessfiles", 0), #no ui ("writecheckpoints", 0), #no ui ("restart", 0), #no ui ("ratematrix", "6rate"), ("statefrequencies", "estimate"), ("ratehetmodel", "gamma"), ("numratecats", 4), ("invariantsites", "estimate"), ] master_opts_and_d = [ ("nindivs", 4), ("holdover", 1), ("selectionintensity", .5), ("holdoverpenalty", 0), ("stopgen", 5000000), ("stoptime", 5000000), ("startoptprec", 0.5), ("minoptprec", 0.01), ("numberofprecreductions", 20), ("treerejectionthreshold", 50.0), ("topoweight", 1.0), ("modweight", 0.05), ("brlenweight", 0.2), ("randnniweight", 0.1), ("randsprweight", 0.3), ("limsprweight", 0.6), ("intervallength", 100), ("intervalstostore", 5), ("limsprrange", 6), ("meanbrlenmuts", 5), ("gammashapebrlen", 1000), ("gammashapemodel", 1000), ("uniqueswapbias", 0.1), ("distanceswapbias", 1.0), ("bootstrapreps", 0), ("inferinternalstateprobs", 0), ] generalOpts = [i[0] for i in general_opts_and_d] masterOpts = [i[0] for i in master_opts_and_d] defaultConfig = dict(general_opts_and_d + master_opts_and_d) _defaultTag = 'outputtag' debugging = False def __init__(self, registry, garliPath): self.garliPath = garliPath self.debugging = GarliWrap.debugging SimpleExternalWrapperServer.__init__(self, registry) self.convertTreeFunc = self.orb is None and toTreeBridge or toIDLTree self.registry = registry self.mat = None self.tree = None self.userConfigCmd = {} self.config = dict(GarliWrap.defaultConfig) self.model = GarliModel() self.ignoreStartEdgeLen = False self._toCallerTranslator = None self.translationObjLock = Lock() # lock to assure single thread access to translation data structs # unused because it is a pain to keep these streams from clogging and # GARLI saves a screen log anyway #self.statusToEventsThread = None # thread for reading stdin and stdout and turning them into string events # not needed as long as statusToEventsThread is unused self.statusEventLock = Lock() # lock to assure single thread access to the event_supplier that posts self.event_supplier = None # self._statusEventConsumer = None # consumer that the event_supplier communicates with self.tempTreesToEventsThread = None # thread for sending tree events self._termEvent = None # threading.Event object that is used by the EventsThreads to check if GARLI is still running. self._tmp_tree_num = 0 def setMatrix(self, mat): _LOG.debug('setMatrix called\n') self.mat = mat def parseCommandToDict(self, cmd): """Returns the key value pairs from parsing the GARLI command. The wrapper command "ignoreStartEdgeLen" is dealt with by updating self.ignoreStartEdgeLen rather than including the option in the dict. """ _LOG.debug('parsing %s' % cmd) cmdStream = cmd.split(';') d = {} for c in cmdStream: s = c.strip() if s and (not s.startswith('[')): spl = s.split('=') if len(spl) < 2: raise ValueError('Expecting = in GARLI configuration command. Found:\n%s\n' % c) key = spl[0].strip().lower() val = '='.join(spl[1:]).strip() if key.lower() == "ignorestartedgelen": v = val.lower() if v == "y" or v == "yes" or v == "1": self.ignoreStartEdgeLen = True elif v == "n" or v == "no" or v == "0": self.ignoreStartEdgeLen = False else: raise ValueError('The value of ignoreStartEdgeLen should be "yes" or "no" (found %s)' % val) else: d[key] = val return d def execute(self, cmd): super(GarliWrap, self).execute(cmd) # call super so that self.userConfigCmdList is updated (for logging) try: newCmdDict = self.parseCommandToDict(cmd) self.userConfigCmd.update(newCmdDict) self.config.update(newCmdDict) except ValueError, x: return False, str(x) return True, '' def setTree(self, tree): _LOG.debug('setTree called. Converting arg to CipresTree and refining\n') self.tree = CipresTree(tree) # garli requires binary input trees: self.tree.randomRefine() _LOG.debug('setTree called with arg:\n') _LOG.debug(self.tree.m_newick) def getTerminatedEvent(self): if self._termEvent is None: self._termEvent = Event() return self._termEvent def setStatusEventConsumer(self, proxyConsumer): """Sets the new event consumer and returns the old one. In current usage within CIPRES this will return None, but the caller should restore the old event consumer, when done with a proxyConsumer to handle future developments. """ sec = self._statusEventConsumer self._statusEventConsumer = proxyConsumer if proxyConsumer is None: self.config["outputeachbettertopology"] = "0" else: self.config["outputeachbettertopology"] = "1" self.statusEventLock.acquire() try: if self._statusEventConsumer is not None: self.event_supplier = EventSupplier(self._statusEventConsumer) else: if self.event_supplier is not None: self.event_supplier.remove() self.event_supplier = None finally: self.statusEventLock.release() return sec def improveTree(self, proxyConsumer): _LOG.debug('improveTree called') treeList = self.improveTreeReturnAll(proxyConsumer) if len(treeList) < 1: return nilTree() return treeList[0] def inferTree(self, proxyConsumer): _LOG.debug('inferTree called') treeList = self.inferTreeReturnAll(proxyConsumer) if len(treeList) < 1: return nilTree() return treeList[0] def improveTreeReturnAll(self, proxyConsumer): _LOG.debug('in improveTreeReturnAll') prevSEC = self.setStatusEventConsumer(proxyConsumer) try: if not (self.mat and self.tree): raise RuntimeError, 'Failed to call setTree and setMatrix before improveTree' return self._garliSearch(self.tree) finally: self.setStatusEventConsumer(prevSEC) def inferTreeReturnAll(self, proxyConsumer): _LOG.debug('in inferTreeReturnAll') prevSEC = self.setStatusEventConsumer(proxyConsumer) try: if not self.mat: raise RuntimeError, 'Failed to call setMatrix before inferTree' return self._garliSearch(None) finally: self.setStatusEventConsumer(prevSEC) def writeConfFile(self, file_obj): genOptsStr = '\n'.join(['%s = %s' % (i, str(self.config[i])) for i in GarliWrap.generalOpts]) masterOptsStr = '\n'.join(['%s = %s' % (i, str(self.config[i])) for i in GarliWrap.masterOpts]) file_obj.write("[general]\n%s\n[master]\n%s\n" % (genOptsStr, masterOptsStr)) def _process_tmp_tree(self, tree, score, model): """Return value is True to indicate more trees can be handled. """ ct = CipresTree(tree) t = self._map_tree_to_clients_names(ct, score) #_LOG.debug("sending %s" % t.m_newick) self._tmp_tree_num = 1 + self._tmp_tree_num t.m_name = "GARLI_tree_%d" % self._tmp_tree_num self.statusEventLock.acquire() try: if self.event_supplier is None: return False self.event_supplier.send_tree(t) finally: self.statusEventLock.release() return True def _map_tree_to_clients_names(self, tree, score): try: _LOG.debug("Translating tree") self.translationObjLock.acquire() t = addScoreAndMapLeaves(tree, score, OptimalityCriterion.LIKELIHOOD, self._toCallerTranslator) fixed_tree = self.convertTreeFunc(t) return fixed_tree finally: self.translationObjLock.release() _LOG.debug("done translating tree") def notify_caller(self, msg): self.statusEventLock.acquire() try: if self.event_supplier is None: return False self.event_supplier.send_str(msg) finally: self.statusEventLock.release() return True def _garliSearch(self, startTree): # @todo not thread safe until we deal with the # fact that _createTempDataFile stores the tree in self.mappedTree # for now, we'll just make sure that calls _garliSearch are serialized assert(self.__dict__.get('mappedTree') is None) extInvContext = self._createTempDataFile( matrix=self.mat, tree=startTree, dirPref='garliWrap', matrixFormat=MatrixFormat.RelaxedPHYLIP, treeForTaxaOnly=True ) self._toCallerTranslator = extInvContext.fromExternalTranslation extInvContext.matrixFileObj.close() confFileName = 'garli.conf' outTag = self.config['ofprefix'] screenLog = outTag + ".screen.log" bestTreeFileName = outTag + '.best.tre' extraPaths = [screenLog, bestTreeFileName, confFileName, extInvContext.matrixFilename, extInvContext.matrixFilename + '.comp'] termEvent = self.getTerminatedEvent() if self.config.get("outputeachbettertopology", 0) == "1": improvingTreeFilename = outTag + ".treelog00.log" extraPaths.extend(improvingTreeFilename) if self.event_supplier is not None: self._tmp_tree_num = 0 tree_th = GarliLatestTreeThread(toCall=self._process_tmp_tree, filename=improvingTreeFilename, stop_event=termEvent) self.tempTreesToEventsThread = tree_th self.tempTreesToEventsThread.start() if startTree is not None: extInvContext.treeFilename = 'start.tre' extInvContext.treeFileObj = open(os.path.join(extInvContext.tempDir, extInvContext.treeFilename), 'w') if self.ignoreStartEdgeLen: elc = TreeDisplay.noEdgeLengths else: elc = TreeDisplay.withEdgeLengthsIfPresent startNewick = self.mappedTree.getNewick(edgeLenCode=elc) writeStartingModel = False if writeStartingModel: extInvContext.treeFileObj.write("%s %s;\n" % (str(self.model), startNewick)) else: extInvContext.treeFileObj.write("%s;\n" % (startNewick)) extInvContext.treeFileObj.close() extraPaths.append(extInvContext.treeFilename) else: extInvContext.treeFilename = None numberedPaths = [outTag + '.log%.02d.log', outTag + '.treelog%02d.log'] try: cfg = None try: cfg = open(os.path.join(extInvContext.tempDir, confFileName), 'w') self.config['datafname'] = extInvContext.matrixFilename self.config['streefname'] = extInvContext.treeFilename is not None and extInvContext.treeFilename or 'random' self.writeConfFile(cfg) finally: if cfg is not None: cfg.close() _LOG.debug('Calling %s from %s' % (self.garliPath, extInvContext.tempDir)) rc = self._callProcess(extInvContext.tempDir, [self.garliPath]) _LOG.debug("GARLI finished") termEvent.set() # this tells the output reading threads to die. self._termEvent = None end_tree_fn = os.path.join(extInvContext.tempDir, bestTreeFileName) if not os.path.exists(end_tree_fn): abs_screen_fn = os.path.join(extInvContext.tempDir, screenLog) if self.event_supplier is not None: if os.path.exists(abs_screen_fn): c = open(abs_screen_fn, "rU").read() self.notify_caller("GARLI LOG (with any error messages) follows:\n\n%s" % c) else: self.notify_caller("GARLI failed to return a tree or leave an error log") return [] sco_mod_tree_lists = parseGarliBestTreeFile(end_tree_fn) for sco_mod_tree in sco_mod_tree_lists: sco_mod_tree[2] = CipresTree(sco_mod_tree[2]) finally: self.mappedTree = None #@todo if not self.debugging: self.cleanupTemporaries(extInvContext.tempDir, extraPaths, numberedPaths) else: _LOG.debug('leaving the temporary directory %s (running in debug mode)' % extInvContext.tempDir) returned_trees = [] for sco_mod_tree in sco_mod_tree_lists: score = sco_mod_tree[0] treeToReturn = sco_mod_tree[2] t = self._map_tree_to_clients_names(treeToReturn, score) returned_trees.append(t) return returned_trees def createGarliWrap(registry, argv): return GarliWrap(registry, _getPathToPaupFromArgv([0] + argv)) # we add an empty element to act like the script name in a sys.argv list def defaultGarliWrap(): """Returns a GarliWrap object with a getCipresRegistry.""" return GarliWrap(getCipresRegistry(), getCipresProperty('registryGarli')) _garliBestTreeRE = re.compile(r'^tree\s+(\S+)\s*=\s*\[..\]\[([-0-9.]+)\]\[(.*)\]\s*(\S*);') def parseGarliBestTreeStream(stream): """Return (score, model, treeString) from a file like object that follows the format of a Garli *.best.tre file. NOTE: this assumes that GARLI always writes the taxa in normal (ascending) order so that no translation needs to be done to get to the implied NEXUS numbering of taxa. """ global _garliBestTreeRE sco_mod_tree_lists = [] for line in stream: m = _garliBestTreeRE.match(line) if m: g = m.groups() score = float(g[1]) model = g[2].strip() tree = g[3] tt = [score, model, tree] sco_mod_tree_lists.append(tt) if len(sco_mod_tree_lists) == 0: raise ValueError, 'did not find tree.' return sco_mod_tree_lists def parseGarliBestTreeFile(filepath): "Return list of [score, model, treeString] from a Garli *.best.tre file" inStream = open(filepath, 'rU') try: return parseGarliBestTreeStream(inStream) finally: inStream.close() def _getPathToGarliFromArgv(argv): GarliWrap.debugging = '-debug' in argv if len(argv) < 2: _LOG.error(' '.join(argv)) raise ValueError, 'Expecting the path to Garli as an argument' gp = argv[1] if not os.path.exists(gp): raise ValueError, 'Expecting the path to Garli as an argument' return gp if __name__=='__main__': argv = sys.argv garliPath = _getPathToGarliFromArgv(argv) def GarliWrapFactory(registry, pathToGarli = garliPath): return GarliWrap(registry, pathToGarli) try: interfaceDict = {} for k in ['TreeEvaluate', 'TreeImprove', 'TreeInfer']: interfaceDict[k] = GarliWrapFactory cipresServe(argv, interfaceDict) except: logException(_LOG) ############################################################################### # What follows are the GARLI0.941 settings and defaults: ############################################################################### # # generalOpts = [ # ("datafname",'inf'), # ("streefname",'random'), # ("ofprefix",'outf'), # ("randseed",-1), # ("megsclamemory",500), # ("availablememory",512), # ("logevery",10), # ("saveevery",100), # ("refinestart",1), # ("outputeachbettertopology",0), # ("enforcetermconditions",1), # ("genthreshfortopoterm",2000), # ("scorethreshforterm",.05), # ("significanttopochange",0.05), # ("outputphyliptree",0), # ("outputmostlyuselessfiles",0), # ("dontinferproportioninvariant",0), # ] # # masterOpts = [ # ("nindivs",4), # ("holdover",1), # ("selectionintensity",.5), # ("holdoverpenalty",0), # ("stopgen",5000000), # ("stoptime",5000000), # ("startoptprec",.5), # ("minoptprec",.01), # ("numberofprecreductions",40), # ("topoweight",1.0), # ("modweight",.05), # ("brlenweight",.2), # ("randnniweight",.2), # ("randsprweight",.3), # ("limsprweight",.5), # ("intervallength",100), # ("intervalstostore",5), # ("limsprrange",6), # ("meanbrlenmuts",5), # ("gammashapebrlen",1000), # ("gammashapemodel",1000), # ("bootstrapreps",0), # ("inferinternalstateprobs",0), # ] # ############################################################################### # End GARLI0.941 settings ###############################################################################