#!/usr/bin/python '''Utility classes with implementation details that are common to wrapping NEXUS-based programs''' import os from PIPRes.util.server_import import * from PIPRes.util.io import tryRemove, tryRmdir, makeCipresTempDir from PIPRes.basic import writeTrees, MatrixFormat, writeDataMatrix, readTreeFromPhylipFile from PIPRes.cipres_types import * _LOG = cipresGetLogger('pipres.wrap.nexus_program_server.py') MAKE_TMP_DIR_PUBLIC = True def mapTreeToLeafSubset(tree): '''This function is used when you have a data structure (e.g. a matrix) for all of the taxa up to ntax, but you only want to write out data pertaining to the taxa that appear on "tree". The function maps the tree to a new tree with no gaps in the leaf set. Returns a tuple of: * tree with mapped leaves, * list of new "local" tax names, * list of indices of a full matrix that should correspond to the tree, * fromExternalTranslation mapping function see WrapExternalProgram._createTempDataFile() for an example of how this is used.''' externalList = map(lambda x: x - 1, tree.getLeafSet()) toLocalTranslation, externalTranslation = createMappingFuncs(externalList) mappedTree = CipresTree(tree) mappedTree.mapLeaves(toLocalTranslation) mappedTree.taxaManager = None rows = copy.copy(externalList) rows.sort() return mappedTree, [('tax%d' % (1 + externalTranslation(i))) for i in xrange(mappedTree.nTaxa)], rows, externalTranslation def addScoreAndMapLeaves(treeToReturn, score, criterion, externalTranslation): if externalTranslation is not None: treeToReturn = CipresTree(treeToReturn) treeToReturn.mapLeaves(externalTranslation) addScore(treeToReturn, score, criterion) #_LOG.debug('Returning ' + str(treeToReturn)) return treeToReturn class InvocationStyleEnum: kUseSystem, kUseSpawn, kUseSubprocess = range(3) class ExternalInvocationContext: '''Instances are created WrapExternalProgram._createTempDataFile to bundle information about where external program files are written. The attributes are: "tempDir" = path to the (temporary) directory where files were written "filename" and "fileObj" are used if only one file is written "matrixFilename", "matrixFileObj", "treeFilename", "treeFileObj" are created by functions that create data files for programs that cannot accept trees within data files. "fromExternalTranslation" is a callable (or None) that can converts from ''' def __init__(self, **kwargs): self.tempDir = kwargs.get("tempDir") self.filename = kwargs.get("filename") self.fileObj = kwargs.get("fileObj") self.matrixFilename = kwargs.get("matrixFilename") self.matrixFileObj = kwargs.get("matrixFileObj") self.treeFilename = kwargs.get("treeFilename") self.treeFileObj = kwargs.get("treeFileObj") self.fromExternalTranslation = kwargs.get("fromExternalTranslation") self.treeTaxLabels = kwargs.get("treeTaxLabels") class WrapExternalProgram: """Class to be used as base-class. Provides simple functions for launching/killing subprocesses and cleaning up temporaries. ASSUMES that self.registry will refer to an instance of CipresRegistryInterface uses the following attribute names: runningProcesses (list of process ID's launched by this instance) mappedTree (tree with tax relabeled to according to which taxa are written in the data matrix) """ _invocationStyle = InvocationStyleEnum.kUseSpawn def killChildProcesses(subprocList): """Kills the processes associated with the list of process ID's passed in as arguments.""" import signal for s in subprocList: pid = isinstance(s, int) and s or int(s.pid) _LOG.debug('Killing process %d' % pid) if sys.platform == 'win32': # note the "pid" will actually be a handle on windows # got this code from http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462 win32api.TerminateProcess(pid, -1) else: os.kill(pid, signal.SIGTERM) killChildProcesses = staticmethod(killChildProcesses) def _callProcess(self, dir, cmdAsList, invocationStyle=None, outReader=None, errReader=None, out_redirection=None, **kwargs): '''launches the "cmdAsList" from "dir" and then returns to the curdir at entry. `outReader` and `errReader` can be callbacks. If supplied (and subprocess is used). then the functions will be passed the launched process's stdout or sterr attribute *before* the process.wait is called. These functions must then launch threads for reading the stderr and stdout. Note if outReader is errReader then, only one call is made. ''' prevDir = os.path.abspath(os.curdir) os.chdir(dir) invocationStyleCode = len(kwargs) > 0 and InvocationStyleEnum.kUseSubprocess or invocationStyle if invocationStyleCode is None: invocationStyleCode = WrapExternalProgram._invocationStyle #invocationStyleCode = InvocationStyleEnum.kUseSpawn subP = None try: for k, v in os.environ.iteritems(): _LOG.debug('%-20s=%s' % (k,v)) rc = None if invocationStyleCode == InvocationStyleEnum.kUseSystem: cmd = ' '.join(cmdAsList) _LOG.debug('system(%s)' % cmd) rc = os.system(cmd) else: if invocationStyleCode == InvocationStyleEnum.kUseSpawn: _LOG.debug('spawnv(os.P_WAIT, %s, %s)' % (cmdAsList[0], str(cmdAsList))) subP = os.spawnv(os.P_NOWAIT, cmdAsList[0], cmdAsList) self.runningProcesses.append(subP) rc = os.waitpid(subP, 0)[1] else: _LOG.debug('subprocess.call(%s, %s)' % (str(cmdAsList), str(kwargs))) if outReader is None: if errReader is None: if out_redirection is None: subP = subprocess.Popen(cmdAsList, **kwargs) else: subP = subprocess.Popen(cmdAsList, stdout=out_redirection, stderr=subprocess.STDOUT, **kwargs) else: subP = subprocess.Popen(cmdAsList, stderr=subprocess.PIPE, **kwargs) errReader(subP.stderr) else: if not errReader is None: subP = subprocess.Popen(cmdAsList, stdout=subprocess.PIPE, **kwargs) else: if errReader is outReader: subP = subprocess.Popen(cmdAsList, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs) else: subP = subprocess.Popen(cmdAsList, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs) errReader(subP.stderr) outReader(subP.stdout) self.runningProcesses.append(subP) rc = subP.wait() _LOG.debug('returned %d' % rc) _LOG.debug('process ran') return rc finally: os.chdir(prevDir) if subP is not None and subP in self.runningProcesses: self.runningProcesses.remove(subP) def cleanupTemporaries(self, tempDirectory, extraPaths, numberedPaths = []): '''Tries to remove "tempDirectory" after removing the files within it. "extraPaths" is a list of file paths (relative to tempDirectory) to be removed. "numberedPaths" is like but each entry should contain one %d (and no other printf style directives. Integers from 0, 1... will be substituted in (and then the file will be removed).''' for p in extraPaths: fp = os.path.join(tempDirectory, p) if not tryRemove(fp): _LOG.debug('could not remove ' + fp) for pTemplate in numberedPaths: p = pTemplate % 0 fp = os.path.join(tempDirectory, p) if not tryRemove(fp): _LOG.debug('could not remove ' + fp) i = 1 while tryRemove(os.path.join(tempDirectory, pTemplate % i)): i += 1 if not tryRmdir(tempDirectory): _LOG.debug('could not remove ' + tempDirectory) def _createTempDataFile(self, **kwargs): '''returns an ExternalInvocationContext instance with temp directory created and the matrix and tree already written to the files kwargs can contain: tree matrix matrixFilename treeFilename dirPref matrixFormat (default = MatrixFormat.PHYLIP The field self.mappedTree will be overwritten.''' tree = kwargs.get('tree') matrix = kwargs.get('matrix') dirPref = kwargs.get('dirPref', 'wrap') if tree is not None: self.mappedTree, treeTaxLabels, rows, fromExternalTranslation = mapTreeToLeafSubset(tree) else: self.mappedTree, treeTaxLabels, rows, fromExternalTranslation = None, [], None, None tempDirectory = makeCipresTempDir(prefix=dirPref) if MAKE_TMP_DIR_PUBLIC: os.chmod(tempDirectory, 0775) _LOG.debug('created directory %s' % tempDirectory) matrixFilename = kwargs.get('matrixFilename', 'infile') matrixFile = None if matrix is not None: filePath = os.path.join(tempDirectory, matrixFilename) matrixFile = open(filePath, 'w') _LOG.debug('writing matrix to ' + filePath) writeDataMatrix(matrix, matrixFile, rowSlice=rows, format=kwargs.get('matrixFormat', MatrixFormat.PHYLIP)) treeFilename = kwargs.get('treeFilename', 'treefile') treeFile = None if tree is not None and not kwargs.get('treeForTaxaOnly', False): treeToWrite = self.mappedTree filePath = os.path.join(tempDirectory, treeFilename) treeFile = open(filePath, 'w') writeTrees(treeFile, [treeToWrite], treeTaxLabels) return ExternalInvocationContext( tempDir=tempDirectory, matrixFilename=matrixFilename, matrixFileObj=matrixFile, treeFilename=treeFilename, treeFileObj=treeFile, fromExternalTranslation=fromExternalTranslation, treeTaxLabels=treeTaxLabels) # use subprocess in wrappers (which are usually simple servers) iff we can import it. try: import subprocess WrapExternalProgram._invocationStyle = InvocationStyleEnum.kUseSubprocess except ImportError: if sys.platform == 'win32': import win32all # we only need this if we can't use subprocess # sys.exit('subprocess module (version of python 2.4 and greater) is required.') class RNGSeeder(random.Random): """Very simple wrapper around that random.Random that adds a version of randint called "externalSeed" which can be used to seed other program's random number generators. """ def __init__(self, x=None): '''x can be a seed.''' random.Random.__init__(self, x) if x is not None: self.seed(x) else: self.nextExternalSeed = self.randint(2,2147483647) self.lastExternalSeed = None def seed(self, x): random.Random.seed(self, x) self.nextExternalSeed = x def externalSeed(self): '''returns a random int in the range [2,2147483647]. The last returned value is stored in the field lastExternalSeed.''' self.lastExternalSeed = self.nextExternalSeed self.nextExternalSeed = self.randint(2,2147483647) return self.lastExternalSeed class SimpleExternalWrapperServer(WrapExternalProgram, SimpleServer): def __init__(self, registry): '''"registry" should be a CipresRegistryInterface instance. Provides/reserves attribute names: runningProcesses = list of launched process ID's exiting = True if the process is cleaning up for exit.''' SimpleServer.__init__(self, registry) self.runningProcesses = [] self.exiting = False def __del__(self): '''Calls removeServants, but not self.remove().''' self.exiting = True _LOG.debug('__del__() called') self.removeServants() def remove(self): '''EXTERNAL METHOD: Removes servants and calls SimpleServer.remove to shutdown the orb.''' self.exiting = True _LOG.debug('remove() called') self.removeServants() SimpleServer.remove(self) def removeServants(self): '''Cleans up by killing launched processes.''' _LOG.debug('removeServants()') WrapExternalProgram.killChildProcesses(self.runningProcesses)