#!/usr/bin/python '''Wraps seq-gen to simulate data.''' from PIPRes.util.server_import import * from PIPRes.cipres_types import * from PIPRes.basic import writeTreesBlock import tempfile import os from PIPRes.wrap.nexus_program_server import * from PIPRes.util.io import tryRemove, tryRmdir from PIPRes.wrap.idl import hasEdgeLengths from PIPRes.nexus.primitives import NexusTaxaManager class DiscreteDataSimulator: #move to IDL def setModel(self, model): raise NotImplementedError, 'DiscreteDataSimulator is abstract' def setTree(self, tree): raise NotImplementedError, 'DiscreteDataSimulator is abstract' def setNumSites(self, nSites): raise NotImplementedError, 'DiscreteDataSimulator is abstract' def getNextMatrix(self): raise NotImplementedError, 'DiscreteDataSimulator is abstract' def getMatrix(self, nSites): raise NotImplementedError, 'DiscreteDataSimulator is abstract' def getDefaultTaxaNames(nTaxa): return ['t%d' % i for i in xrange(1, nTaxa + 1)] class SeqGenWrap(DiscreteDataSimulator, SimpleNexusWrapperServer, object): '''Can be used as a generator (returning up to nRealizations matrices). If nRealizations is set to None, then it will be an infinite generator.''' debugging = True def __init__(self, registry, sgPath, **kwargs): '''acceps kwargs 'model', 'nSites', and 'tree'. ''' self.debugging = SeqGenWrap.debugging SimpleNexusWrapperServer.__init__(self, registry) self.sgPath = sgPath self.convertTreeFunc = self.orb is None and toTreeBridge or toIDLTree self.registry = registry self.defaultArgs = [ '-mHKY', #HKY variant '-fe', # equal state freq # omitting -t for ti/tv means the ratio is 1 '-on', '-n1', # just simulating one data set at a time '-q', # quiet mode ] self.rng = RNGSeeder() self.touch() self._nSites = 0 self.nSites = 0 self.nRealizations = 1 self.model = None self.outputRedirectionFile = 'data.nex' self.tree = None self.nRealizationsReturned = 0 self.taxMgr = None def getModelArgs(self): #@ need to check model here return self.defaultArgs + self.getSeedArgs() def setSeed(self, n): self.rng.seed(n) def getSeedArgs(self): return ['-z%d' % self.rng.externalSeed()] def getPrevSeed(self): return self.rng.lastExternalSeed def getRateHetArgs(self): return [] # ['-a%f' % float(self.alpha)] def _getExtraPaths(self): extraPaths = [self.outputRedirectionFile] return extraPaths def setModel(self, model): if model != model: self.touch() raise NotImplementedError, 'SeqGenWrap is unfinished' def touch(self): self.matList = [] self.nRealizationsReturned = 0 def setTree(self, tree): '''seq-gen does not like polytomies. If given a tree with numeric taxlabels, it will produce a data matrix with unsorted numbers as taxa labels. This causes read_nexus to choke.''' self.touch() self.tree = isinstance(tree, Tree) and tree or CipresTree(tree) self.tree.makeBinary() self.correctTaxLabels = getDefaultTaxaNames(tree.nTaxa) self.taxMgr = NexusTaxaManager(self.correctTaxLabels) self.tree.taxaManager = self.taxMgr def getNumSites(self): return self._nSites def setNumSites(self, nSites): if nSites < 0: raise ValueError, 'nSites must be >= 0' if self._nSites != nSites: self.touch() self._nSites = long(nSites) def getNextMatrix(self): if len(self.matList) == 0: self.simulate() self.nRealizationsReturned += 1 toReturn = self.matList.pop() _LOG.debug('returning matrix') return toReturn def getMatrix(self, nSites): self.nSites = nSites return self.getNextMatrix() nSites = property(getNumSites, setNumSites) def checkState(self): if not self.tree: raise ValueError, 'setTree must be called first' if not hasEdgeLengths(self.tree): raise ValueError, 'model tree must have edge lengths' #if not self.model: # raise ValueError, 'model is not valid if self.nSites < 0: raise ValueError, 'nSites must be >= 0' def __iter__(self): nToReturn = self.nRealizations while True: if (self.nRealizations is not None) and (self.nRealizationsReturned >= self.nRealizations): raise StopIteration yield self.getNextMatrix() def __nonzero__(self): try: self.checkState() return True except: return False def getEmptyMatrix(self): #@ need to check model and return the appropriate data type here. return CipresDiscreteMatrix([], datatype=CipresIDL_api1.DNA_DATATYPE, hasGaps=False) def _readMatrix(self, matFile): if self.exiting: return nilMatrix() if self.nexusReader is None: _LOG.debug('getting ReadNexus object ref') self.nexusReader = self.registry.getService(CipresIDL_api1.ReadNexus) if self.nexusReader is None: raise RuntimeError, 'Could not acquire a ReadNexus servant' self.nexusReader.readNexusFile(matFile, CipresIDL_api1.ReadNexus.NEXUS_CHARACTERS_BLOCK_BIT | CipresIDL_api1.ReadNexus.NEXUS_TAXA_BLOCK_BIT) taxLabels = self.nexusReader.getTaxa(0) mat = CipresDiscreteMatrix(self.nexusReader.getCharacters(0)) mat.reorderMatrix(taxLabels, self.correctTaxLabels) #raise AssertionError, 'need to reorder taxa' return mat def simulate(self): self.checkState() if self.nSites == 0: self.matList = [self.getEmptyMatrix()] return # probably shouldn't be deriving from SimpleNexusWrapperServer because sg does not read NEXUS #I just want to be using the same tempdirectory code for all wrappers for the time being. extInvContext = self._createTempDataFile( matrix=None, supportsTaxaSubsets=False, writeTree=False, dirPref='sgWrap', filename='tree', writePoundNexus=False) try: extInvContext.fileObj.write('%s;\n' % self.tree.getNewick(TreeDisplay.demandEdgeLengths, False)) extInvContext.fileObj.close() fullPathToOutput = os.path.join(extInvContext.tempDir, self.outputRedirectionFile) segGenCmd = [self.sgPath] + self.getModelArgs() segGenCmd.append('-l%d' % self.nSites) if False: #piping input with subprocess does not seem to be working with seq-gen (typing the input interactively fails too) infile = open(os.path.join(extInvContext.tempDir, extInvContext.filename), 'rU') outfile = open(fullPathToOutput, 'w') self._callProcess(extInvContext.tempDir, segGenCmd, stdin=infile, stdout=outfile) # infile.close() outfile.close() self._callProcess(extInvContext.tempDir, segGenCmd + ['<', extInvContext.filename, '>', self.outputRedirectionFile ], invocationStyle=InvocationStyleEnum.kUseSystem) # # until we modify read_nexus_to handle multiple matrices (or better yet) make an iterable NEXUS reader # we just simulate one matrix at a time self.matList = [self._readMatrix(fullPathToOutput)] finally: if not self.debugging: self.cleanupTemporaries(extInvContext.tempDir, [extInvContext.filename] + self._getExtraPaths()) else: _LOG.debug('leaving the temporary directory %s (running in debug mode)' % extInvContext.tempDir) def multinomial(rng, proportions): r = rng.random() for n, p in enumerate(proportions): r -= p if r < 0.0: return n return n def countsFromMultinomial(rng, proportions, n): realization = [0] * len(proportions) for i in xrange(n): index = multinomial(rng, proportions) realization[index] += 1 return realization class MixtureSimulator(DiscreteDataSimulator, object): '''Can be used as a generator (returning up to nRealizations matrices). If nRealizations is set to None, then it will be an infinite generator.''' debugging = True def __init__(self, simPropList): '''acceps kwargs 'model', 'nSites', and 'tree'. ''' self.simulators = [i[0] for i in simPropList] self.expectedProportions = [i[1] for i in simPropList] self._nSites = 0 self.nSites = 0 self.nRealizations = 1 self.nRealizationsReturned = 0 self.rng = RNGSeeder() def getPrevSeed(self): return self.rng.lastExternalSeed def setModel(self, model): raise NotImplementedError, 'MixtureSimulator is unfinished' def touch(self): self.matList = [] self.nRealizationsReturned = 0 def setModel(self, model): raise NotImplementedError, 'MixtureSimulator is unfinished' def getNumSites(self): return self._nSites def setNumSites(self, nSites): if nSites < 0: raise ValueError, 'nSites must be >= 0' if self._nSites != nSites: self.touch() self._nSites = long(nSites) def getNextMatrix(self): if len(self.matList) == 0: self.simulate() self.nRealizationsReturned += 1 toReturn = self.matList.pop() _LOG.debug('returning matrix') return toReturn def getMatrix(self, nSites): self.nSites = nSites return self.getNextMatrix() nSites = property(getNumSites, setNumSites) def checkState(self): if self.nSites < 0: raise ValueError, 'nSites must be >= 0' def __iter__(self): nToReturn = self.nRealizations while True: if (self.nRealizations is not None) and (self.nRealizationsReturned >= self.nRealizations): raise StopIteration yield self.getNextMatrix() def __nonzero__(self): try: self.checkState() return True except: return False def getEmptyMatrix(self): #@ need to check model and return the appropriate data type here. return CipresDiscreteMatrix([], datatype=CipresIDL_api1.DNA_DATATYPE, hasGaps=False) def simulate(self): self.checkState() if self.nSites == 0: self.matList = [self.getEmptyMatrix()] return realizedCounts = countsFromMultinomial(self.rng, self.expectedProportions, self.nSites) subMat = [] for i, ns in enumerate(realizedCounts): simulator = self.simulators[i] subMat.append(simulator.getMatrix(ns)) superMatrix = subMat[0] if len(subMat) > 1: superMatrix.extend(subMat[1:]) self.matList = [superMatrix] _LOG = cipresGetLogger('pipres.service_impl.server.seqgen_wrap') def _getPathToSGFromArgv(argv): SeqGenWrap.debugging = '-debug' in argv if len(argv) < 2: raise ValueError, 'Expecting the path to seq-gen as an argument' return argv[1] def createSeqGenWrap(registry, argv): return SeqGenWrap(registry, _getPathToSGFromArgv([0] + argv)) # we add an empty element to act like the script name in a sys.argv list def defaultSeqGenWrap(): '''Returns a SeqGenWrap object with a getCipresRegistry, and path to seq-gen from the environment.''' return SeqGenWrap(getCipresRegistry(), os.environ['PATH_TO_SEQ_GEN']) if __name__=='__main__': argv = sys.argv sgPath = _getPathToSGFromArgv(argv) def SeqGenWrapFactory(registry, pathToSG = sgPath): return SeqGenWrap(registry, pathToSG) try: interfaceDict = {} for k in ['DiscreteDataSimulator']: interfaceDict[k] = SeqGenWrapFactory cipresServe(argv, interfaceDict) except: logException(_LOG)