#!/usr/bin/python """Currently implements tree searching via a file-based wrapper around RAxML-VI-HPC-1.0 Default invocation is equivalent to: raxmlHPC -f d -m GTRMIX -n outputtag -s infile -t treefile""" import os import re import time from threading import Event, Thread, Lock from PIPRes.util.server_import import * from PIPRes.cipres_types import * from PIPRes.wrap.external_program import SimpleExternalWrapperServer, MatrixFormat, readTreeFromPhylipFile, addScoreAndMapLeaves from PIPRes.wrap.output_parser import * from PIPRes.util.cipres import getCipresProperty from PIPRes.util.io import cipresGetLogger from PIPRes.event_consumer import EventSupplier _LOG = cipresGetLogger('org.cipres.raxml.raxml_wrap.py') _some_whitespace = re.compile(r"\s+") _argless_opts = 'dhjvy' _arg_opts = 'bceim#' _unsupported_opts = 'afgnqrstwz' def readRaxmlLogFilepath(fp): "Takes the name of a RAxML_log.x file, and returns of list of pairs of floats: [[time, score], ] for the iterations" inStream = open(fp, 'rU') try: return readRaxmlLogStream(inStream) finally: inStream.close() def readRaxmlLogStream(inStream): "Takes a file object (referring to RAxML_log.x file), and returns of list of pairs of floats: [[time, score], ] for the iterations" result = [] for line in inStream: lineList = line.split() assert(len(lineList) > 1) # every line should have at least: time score result.append([float(lineList[0]), float(lineList[1])]) return result # This is the column (space-separated) index of the score within a RAxML log file. _SCORE_IN_LOG = 1 class RaxmlCheckpointThread(Thread): """A thread that will read the RAxML checkpoint files. """ def __init__(self, toCall, tax_labels, output_tag, dir=os.curdir, stop_event=None, READ_SLEEP_INTERVAL=0.5): """`toCall` is the callable that takes a (raw) tree string, score and model and returns a False value to terminate the reading. see LineReadingThread for other args """ self.toCall = toCall if not callable(self.toCall): raise ValueError("toCall arg must be callable") self.dir = dir self.output_tag = output_tag self.curr_tree = 0 self.sleep_interval = READ_SLEEP_INTERVAL self.tax_labels = tax_labels self.stop_event = stop_event Thread.__init__(self, group=None, target=None, name=None, args=(), kwargs={}) def run(self): checkpoint_fn_pref = "RAxML_checkpoint.%s" % self.output_tag checkpoint_pref = os.path.join(self.dir, checkpoint_fn_pref) score_fn = "RAxML_log.%s" % self.output_tag score_file = os.path.join(self.dir, score_fn) while True: fn = "%s.%d" % (checkpoint_pref, self.curr_tree) tree = None while tree is None: if (self.stop_event is not None) and self.stop_event.isSet(): return if not os.path.exists(fn) and not self._wait_for_file_to_appear(fn): return try: tree = readTreeFromPhylipFile(fn, taxLabels=self.tax_labels) _LOG.debug(tree) except BadTreeDefError, x: logException(_LOG) if tree is None: time.sleep(self.sleep_interval) score = None while score is None: if (self.stop_event is not None) and self.stop_event.isSet(): return if not self._wait_for_file_to_appear(score_file): return try: log_lines = readRaxmlLogFilepath(score_file) if len(log_lines) > self.curr_tree: score = float(log_lines[self.curr_tree][_SCORE_IN_LOG]) except: pass if score is None: time.sleep(self.sleep_interval) self.curr_tree += 1 tree.setName("RAxML_tree_%d" % self.curr_tree) self.toCall(tree, score, "") def _wait_for_file_to_appear(self, filename): while True: if os.path.exists(filename): return True if (self.stop_event is not None) and self.stop_event.isSet(): return False time.sleep(self.sleep_interval) def _wait_for_file(self, filename): if self._wait_for_file_to_appear(filename): return open(filename, "rU") return None class RaxmlWrap(object, CipresIDL_api1__POA.TreeImprove, CipresIDL_api1__POA.TreeInfer, SimpleExternalWrapperServer): _defaultTag = 'outputtag' debugging = True def __init__(self, registry, raxmlPath): self.raxmlPath = raxmlPath self.debugging = RaxmlWrap.debugging SimpleExternalWrapperServer.__init__(self, registry) self.convertTreeFunc = self.orb is None and toTreeBridge or toIDLTree self.registry = registry self.mat = None self.tree = None self.user_cmd_dict = {} self.execute('-m GTRMIX') self.writeCheckpoints = False self._toCallerTranslator = None self.translationObjLock = Lock() # lock to assure single thread access to translation data structs self.statusEventLock = Lock() # lock to assure single thread access to the event_supplier that posts self.event_supplier = None # self._statusEventConsumer = None # consumer that the event_supplier communicates with self.tempTreesToEventsThread = None # thread for sending tree events self._termEvent = None # threading.Event object that is used by the EventsThreads to check if GARLI is still running. def _parse_args(self, arg_iter): """Walks through the iterable `arg_iter` storing the args in parsed fields in `self.user_cmd_dict` and returning the (boolean, message) tuple that `self.execute` returns. """ try: while True: c = arg_iter.next() if not (len(c) == 2 and c.startswith('-')): m = 'Expecting a flag character preceded by an -. '\ 'Found "%s"' % c return (False, m) flag = c[1] if flag in _argless_opts: self.user_cmd_dict[c] = None elif flag in _arg_opts: try: a = arg_iter.next() except: return (False, "Expecting argument after %s" % c) self.user_cmd_dict[c] = a elif flag in _unsupported_opts: return (False, "The option %s is not supported in "\ "the RAxML wrapper" % c) else: return (False, "Unknown option %s" % c) except StopIteration: pass return (True, '') def _get_cmd_line_args(self): cList = [] for k, v in self.user_cmd_dict.iteritems(): if v is None: cList.append(k) else: cList.extend([k, v]) return ' '.join(cList) def execute(self, cmd): global _some_whitespace super(RaxmlWrap, self).execute(cmd) # call super so that self.userConfigCmdList is updated (for logging) sc = cmd.strip() tok_list = _some_whitespace.split(sc) arg_iter = iter(tok_list) r = self._parse_args(arg_iter) _LOG.debug(self.user_cmd_dict) return r def setMatrix(self, mat): _LOG.debug('setMatrix called\n') # _LOG.debug('matrix = %s\n' % mat) self.mat = mat def setTree(self, tree): _LOG.debug('setTree called. Converting arg to CipresTree and refining\n') self.tree = CipresTree(tree) #raxml requires binary input trees: self.tree.randomRefine() _LOG.debug('setTree called with arg:\n') _LOG.debug(self.tree.m_newick) def improveTreeReturnAll(self, proxyConsumer): _LOG.debug('improveTreeReturnAll called: TEMPORARY implementation!!!') #should actually check to see if RAxML returns multiple trees # this is a stop gap r = self.improveTree(proxyConsumer) if r is None: return [] return [r] def inferTreeReturnAll(self, proxyConsumer): _LOG.debug('inferTreeReturnAll called: TEMPORARY implementation!!!') #should actually check to see if RAxML returns multiple trees # this is a stop gap r = self.inferTree(proxyConsumer) if r is None: return [] return [r] def improveTree(self, proxyConsumer): _LOG.debug('improveTree called') if not (self.mat and self.tree): raise RuntimeError, 'Failed to call setTree and setMatrix before improveTree' self.writeCheckpoints = proxyConsumer is not None prevSEC = self.setStatusEventConsumer(proxyConsumer) try: return self._raxmlSearch(self.tree, self._get_cmd_line_args()) finally: self.setStatusEventConsumer(prevSEC) def inferTree(self, proxyConsumer): _LOG.debug('inferTree called') if not self.mat: raise RuntimeError, 'Failed to call setMatrix before inferTree' self.writeCheckpoints = proxyConsumer is not None prevSEC = self.setStatusEventConsumer(proxyConsumer) try: return self._raxmlSearch(None, self._get_cmd_line_args()) finally: self.setStatusEventConsumer(prevSEC) def getTerminatedEvent(self): if self._termEvent is None: self._termEvent = Event() return self._termEvent def setStatusEventConsumer(self, proxyConsumer): """Sets the new event consumer and returns the old one. In current usage within CIPRES this will return None, but the caller should restore the old event consumer, when done with a proxyConsumer to handle future developments. """ sec = self._statusEventConsumer self._statusEventConsumer = proxyConsumer self.statusEventLock.acquire() try: if self._statusEventConsumer is not None: self.event_supplier = EventSupplier(self._statusEventConsumer) else: if self.event_supplier is not None: self.event_supplier.remove() self.event_supplier = None finally: self.statusEventLock.release() return sec def _process_tmp_tree(self, tree, score, model): """Return value is True to indicate more trees can be handled. """ t = self._map_tree_to_clients_names(tree, score) self.statusEventLock.acquire() try: if self.event_supplier is None: return False self.event_supplier.send_tree(t) finally: self.statusEventLock.release() return True def _map_tree_to_clients_names(self, tree, score): try: _LOG.debug("Translating tree") self.translationObjLock.acquire() t = addScoreAndMapLeaves(tree, score, OptimalityCriterion.LIKELIHOOD, self._toCallerTranslator) fixed_tree = self.convertTreeFunc(t) return fixed_tree finally: self.translationObjLock.release() _LOG.debug("done translating tree") def notify_caller(self, msg): self.statusEventLock.acquire() try: if self.event_supplier is None: return False self.event_supplier.send_str(msg) finally: self.statusEventLock.release() return True def _raxmlSearch(self, startTree, configCmd): _LOG.debug("RAxML search with user options %s" % configCmd) extInvContext = self._createTempDataFile( matrix=self.mat, tree=startTree, dirPref='raxmlWrap', matrixFormat=MatrixFormat.RelaxedPHYLIP ) self._toCallerTranslator = extInvContext.fromExternalTranslation extraPaths = [] extInvContext.matrixFileObj.close() extraPaths.append(extInvContext.matrixFilename) if startTree is not None: extInvContext.treeFileObj.close() startTreeCmdList = ['-t', extInvContext.treeFilename ] extraPaths.append(extInvContext.treeFilename) else: startTreeCmdList = [] outtag = RaxmlWrap._defaultTag raxmlVariableOuputs = ['log', 'result', 'info', 'parsimonyTree', 'bootstrap'] extraPaths.extend(['RAxML_%s.%s' % (i, outtag) for i in raxmlVariableOuputs]) extraPaths.append("stdouterr") numberedPaths = ['RAxML_checkpoint.' + outtag + '.%d'] termEvent = self.getTerminatedEvent() try: searchParams = ['-f', 'd'] # "-f d" means do tree searching, "-f e" is tree evaluating. a = [self.raxmlPath] + searchParams + configCmd.split() + ['-n', outtag, '-s', extInvContext.matrixFilename] + startTreeCmdList if self.writeCheckpoints: a.append("-j") tax_labels = list(extInvContext.treeTaxLabels) tree_th = RaxmlCheckpointThread( toCall=self._process_tmp_tree, tax_labels=tax_labels, output_tag=outtag, dir=extInvContext.tempDir, stop_event=termEvent) self.tempTreesToEventsThread = tree_th self.tempTreesToEventsThread.start() red_fn = "stdouterr" out_redirect_fp = os.path.join(extInvContext.tempDir, red_fn) out_redirect = open(out_redirect_fp, "w") self._callProcess( extInvContext.tempDir, a, out_redirection=out_redirect) out_redirect.close() _LOG.debug("RAxML finished") termEvent.set() # this tells the output reading threads to die. self._termEvent = None log_fn = os.path.join(extInvContext.tempDir, 'RAxML_log.' + outtag) if not os.path.exists(log_fn): if self.event_supplier is not None: if os.path.exists(out_redirect_fp): c = open(out_redirect_fp, "rU").read() self.notify_caller("RAxML LOG (with any error messages) follows:\n\n%s" % c) else: self.notify_caller("RAxML failed to return a tree or leave an error log") return None scores = scoresFromRaxml(log_fn) finalTreePath = os.path.join(extInvContext.tempDir, 'RAxML_result.%s' % outtag) treeToReturn = readTreeFromPhylipFile(finalTreePath, taxLabels=extInvContext.treeTaxLabels) _LOG.debug(str(treeToReturn)) _LOG.debug(treeToReturn.m_newick) finally: if not self.debugging: self.cleanupTemporaries(extInvContext.tempDir, extraPaths, numberedPaths) else: _LOG.debug('leaving the temporary directory %s (running in debug mode)' % extInvContext.tempDir) return self._map_tree_to_clients_names(treeToReturn, scores[0]) def _getPathToRaxmlFromArgv(argv): RaxmlWrap.debugging = '-debug' in argv if len(argv) < 2: _LOG.error(' '.join(argv)) raise ValueError, 'Expecting the path to RAxML-VI as an argument' return argv[1] def createRaxmlWrap(registry, argv): return RaxmlWrap(registry, _getPathToPaupFromArgv([0] + argv)) # we add an empty element to act like the script name in a sys.argv list def defaultRaxmlWrap(): """Returns a RaxmlWrap object with a getCipresRegistry""" return RaxmlWrap(getCipresRegistry(), getCipresProperty('registryRaxml')) def scoresFromRaxml(fn): "Takes the name of a parent directory, a file tag (required by RAxML) and returns of list of pairs of floats: [[time, score], ] for the iterations" tsPairs = readRaxmlLogFilepath(fn) return [i[_SCORE_IN_LOG] for i in tsPairs] if __name__=='__main__': argv = sys.argv raxmlPath = _getPathToRaxmlFromArgv(argv) def RaxmlWrapFactory(registry, pathToRaxml = raxmlPath): return RaxmlWrap(registry, pathToRaxml) try: interfaceDict = {} for k in ['TreeEvaluate', 'TreeImprove', 'TreeInfer']: interfaceDict[k] = RaxmlWrapFactory cipresServe(argv, interfaceDict) except: logException(_LOG) """ class PaupWrap(object, CipresIDL_api1__POA.TreeImprove, CipresIDL_api1__POA.TreeInfer, CipresIDL_api1__POA.TreeEvaluate): debugging = True def __init__(self, registry, paupPath): self.nexusReader = None self.bootSearchDefault = 'Default HSearch nomultrees swap=spr ' self.bootDefTemplate = 'Default Boot nreps = %d cutoffpct=0 grpfreq=yes ' self.searchRNGSeeder = None self.bootRNGSeeder = None self.bootFreqFile = 'bootfreqs.log' self.nBootReps = 100 self.treeImproveDefault = 'Default HSearch nomultrees swap=spr ' self.treeInferDefault = 'Default HSearch nomultrees swap=spr start=stepwise ' self.treeEvaluateDefault = '' self.futurePaup = False # in future releases paup will support searching on partial leaf sets # if we aren't using future paup, we need to translate the taxon labels if not os.path.exists(paupPath): raise ValueError, 'The specified path to PAUP (%s) does not exist.' % paupPath self.paupPath = paupPath self.constraints = [] self.criterion = OptimalityCriterion.PARSIMONY self.model = None self.taxLabels = None self.taxManager = None """ # What follows is the output of running "raxmlHPC -h" on the version that this wrapper # was written for (and tested against): # # #This is RAxML-HPC version 1.0 released by Alexandros Stamatakis in March 2005 #Please also consult the RAxML-manual #To report bugs send an email to stamatak@ics.forth.gr # # #raxmlHPC[-MPI|-OMP] [-a weightFileName] [-b bootstrapRandomNumberSeed] [-c numberOfCategories] [-f d|e] # [-h] [-i initialRearrangementSetting] [-j] [-m GTRCAT|GTRMIX|GTRGAMMA] # [-t userStartingTree] [-w workingDirectory] [-v] [-y] [-# numberOfRuns] # -s sequenceFileName -n outputFileName # # -a Specify a column weight file name to assign individual weights to each column of # the alignment. Those weights must be integers separated by any type and number # of whitespaces whithin a separate file, see file "example_weights" for an example. # # -b Specify an integer number (random seed) and turn on bootstrapping # # DEFAULT: OFF # # -c Specify number of distinct rate catgories for RAxML when modelOfEvolution # is set to GTRCAT or GTRMIX # Individual per-site rates are categorized into numberOfCategories rate # categories to accelerate computations. # # DEFAULT: 25 # # -f select algorithm: # # "-f d": rapid hill-climbing # "-f e": optimize model+branch lengths for given input tree under GTRGAMMA only # # DEFAULT: rapid hill climbing # # -i Initial rearrangement setting for the subsequent application of topological # changes phase # # DEFAULT: determined by program # # -j Specifies if checkpoints will be written by the program. If checkpoints # (intermediate tree topologies) shall be written by the program specify "-j" # # DEFAULT: OFF # # -h Display this help message. # # -m Model of Nucleotide Substitution: # # "-m GTRCAT": GTR + Optimization of substitution rates + Optimization of site-specific # evolutionary rates which are categorized into numberOfCategories distinct # rate categories for greater computational efficiency # if you do a multiple analysis with "-#" but without bootstrapping the program # will use GTRMIX instead # "-m GTRGAMMA": GTR + Optimization of substitution rates + GAMMA model of rate # heterogeneity (alpha parameter will be estimated) # "-m GTRMIX": Inference of the tree under GTRCAT # and thereafter evaluation of the final tree topology under GTRGAMMA # # DEFAULT: GTRCAT # # -n Specifies the name of the output file. # # -s specify the name of the alignment data file in PHYLIP format # # -t Specify a user starting tree file name in Newick format # # -v Display version information # # -w Name of the working directory where RAxML will write its output files # # DEFAULT: current directory # # -y If you want to only compute a parsimony starting tree with RAxML specify "-y", # the program will exit after computation of the starting tree # # DEFAULT: OFF # # -# Specify the number of alternative runs on distinct starting trees # In combination with the "-b" option, this will invoke a multiple boostrap analysis # # DEFAULT: 1 single analysis #