#!/usr/bin/python """An example implementation of a CIPRES service in python. This example wraps clustalw a popular tool for multiple sequence alignment. """ import os ################################################################################ # util.server_import imports several functions and classes that are frequently # used when writing service implementation (this allows you to ignore much # of the package structure of PIPRes for simple tasks). ################################################################################ from PIPRes.util.server_import import * ################################################################################ # Using a logging object configured by the CIPRES library # allows for configuration of the amount and formatting of # output using a logging_pipres.conf file (in # $CIPRES_ROOT/share/cipres/resources or $(CIPRES_USER_HOME) ################################################################################ _LOG = cipresGetLogger('cipres.example.service_impl.clustal_wrap.py') from PIPRes.wrap.external_program import SimpleExternalWrapperServer from PIPRes.basic import MatrixFormat class ClustalWrap(SimpleExternalWrapperServer, CipresIDL_api1__POA.MatrixAlign): """Our class inherits from the IDL interface that we are implementing SimpleExternalWrapperServer is a base class that includes basic implementation of some core parts of the CIPRES object interface (e.g. "execute" and "remove") as well as providing hooks for dealing with file-based communication with an external application (clustal in this case). In this example we use "lazy" evaluation of the alignment. Clustal is invoked the first time one of its outputs is requested (e.g. "getAlignedMatrix" or "getScore"). """ def __init__(self, registry, clustalPath): """"registry" must be CipresRegistryInterface instance "clustalPath" should be the path to the clustal executable.""" self.clustalPath = clustalPath if not os.access(clustalPath, os.X_OK): if not os.path.exists(clustalPath): raise ValueError, '%s does not exist.' % clustalPath raise ValueError, '%s is not an executable.' % clustalPath SimpleExternalWrapperServer.__init__(self, registry) self.registry = registry self.taxa = [] self.inputMatrix = None self.userGuideTree = None self._resetOutputs() def _resetOutputs(self): """Called internally when the inputs change (so that a new alignment is performed whenever inputs change, but not when it is not required).""" self.alignedMatrix = None self.guideTree = None self.score = None def setTaxa(self, taxa): self.taxa = taxa def getTaxa(self): return self.taxa def setInputMatrix(self, matrix): self._resetOutputs() self.inputMatrix = matrix def setGuideTree(self, guideTree): self._resetOutputs() self.userGuideTree = guideTree def getAlignedMatrix(self): if self.alignedMatrix is None: self.runClustal() assert self.alignedMatrix is not None return self.alignedMatrix def getTree(self, proxyConsumer): if self.userGuideTree is not None: return self.userGuideTree if self.guideTree is None: self.runClustal() assert(self.guideTree is not None) return self.guideTree def getScore(): if self.score is None: self.runClustal() assert(self.score is not None) return self.score def runClustal(self): """Invokes clustal using the current state of the ClustalWrap instance to compose commands and provide input matrix and tree. Raises an exception if "setInputMatrix" has not been called.""" if self.inputMatrix is None: raise RuntimeError, 'Failed to call setInputMatrix before requesting alignment' ######################################################################## # SimpleExternalWrapperServer._createTempDataFile() creates a temporary # directory with data files ######################################################################## extInvContext = self._createTempDataFile( matrix=self.inputMatrix, tree=self.userGuideTree, dirPref='clustalWrap', matrixFormat=MatrixFormat.FASTA, treeForTaxaOnly=True ) _LOG.debug(os.path.join(extInvContext.tempDir, extInvContext.matrixFilename)) def createClustalWrap(registry, argv): return ClustalWrap(registry, _getPathToPaupFromArgv([0] + argv)) # we add an empty element to act like the script name in a sys.argv list def defaultClustalWrap(): """Returns a MBTwoExpWrap object with a getCipresRegistry, and path to mbTwoExp from the environment.""" return ClustalWrap(getCipresRegistry(), getCipresProperty('registryClustal')) def _getPathToClustalFromArgv(argv): ClustalWrap.debugging = '-debug' in argv if len(argv) < 2: _LOG.error(' '.join(argv)) raise ValueError, 'Expecting the path to Clustal as an argument' gp = argv[1] if not os.path.exists(gp): raise ValueError, 'Expecting the path to Clustal as an argument' return gp if __name__=='__main__': argv = sys.argv clustalPath = _getPathToClustalFromArgv(argv) def clustalWrapFactory(registry, pathToClustal = clustalPath): return ClustalWrap(registry, pathToClustal) try: interfaceDict = {} for k in ['MatrixAlign']: interfaceDict[k] = CipresTestingClient cipresServe(argv, interfaceDict) except: logException(_LOG) if False: def writeConfFile(self, file_obj): genOptsStr = '\n'.join(['%s = %s' % (i, str(self.config[i])) for i in GarliWrap.generalOpts]) masterOptsStr = '\n'.join(['%s = %s' % (i, str(self.config[i])) for i in GarliWrap.masterOpts]) file_obj.write("[general]\n%s\n[master]\n%s\n" % (genOptsStr, masterOptsStr)) def _garliSearch(self, startTree): # @todo not thread safe until we deal with the # fact that _createTempDataFile stores the tree in self.mappedTree # for now, we'll just make sure that calls _garliSearch are serialized assert(self.__dict__.get('mappedTree') is None) tempDirectory, matFilePair, treeFilePair, toExternalTranslation, treeTaxLabels = self._createTempDataFile( matrix=self.mat, tree=startTree, dirPref='garliWrap', matrixFormat=MatrixFormat.RelaxedPHYLIP, treeForTaxaOnly=True ) matFileName, matFileStream = matFilePair matFileStream.close() confFileName = 'garli.conf' outTag = self.config['ofprefix'] bestTreeFileName = outTag + '.best.tre' extraPaths = [bestTreeFileName, confFileName, matFileName, matFileName + '.comp'] if startTree is not None: startTreefileName='start.tre' f = open(os.path.join(tempDirectory, startTreefileName), 'w') f.write("%s %s;\n" % (str(self.model), self.mappedTree.getNewick())) f.close() extraPaths.append(startTreefileName) else: startTreefileName = None numberedPaths = [outTag + '.log%.02d.log', outTag + '.treelog%02d.log'] try: cfg = None try: cfg = open(os.path.join(tempDirectory, confFileName), 'w') self.config['datafname'] = matFileName self.config['streefname'] = startTreefileName is not None and startTreefileName or 'random' self.writeConfFile(cfg) finally: if cfg is not None: cfg.close() _LOG.debug('Calling %s from %s' % (self.garliPath, tempDirectory)) self._callProcess(tempDirectory, [self.garliPath]) finalTreePath = os.path.join(tempDirectory, bestTreeFileName) score, modelString, treeString = parseGarliBestTreeFile(finalTreePath) treeToReturn = CipresTree(treeString) finally: self.mappedTree = None #@todo if not self.debugging: self.cleanupTemporaries(tempDirectory, extraPaths, numberedPaths) else: _LOG.debug('leaving the temporary directory %s (running in debug mode)' % tempDirectory) t = addScoreAndMapLeaves(treeToReturn, score, OptimalityCriterion.LIKELIHOOD, toExternalTranslation) return self.convertTreeFunc(t)