#!/usr/bin/python from PIPRes.util.io import getCipresTestFileDir, expandPath, getCipresTmpDir from PIPRes.util.cipres import getCipresProperty from PIPRes.testing import * _LOG = cipresGetLogger('test_cipres.test_read_nexus') parDir = getCipresTmpDir() test_install_dir = os.path.dirname(__file__) pipres_root = os.path.dirname(test_install_dir) testFilesDir = os.path.join(pipres_root, 'test', 'files') installTestFileDir = os.path.join(test_install_dir, 'files') class TestReadNexus(CipresTestingClient): serviceRegistryEntry = composeRegistryQuery(interface='ReadNexus') serviceType = CipresIDL_api1.ReadNexus def implPathMatrixTest(self, fileMatrixPairs): for path, expectedTaxa, expectedMatrix in fileMatrixPairs: dataFile = expandPath(path) if os.path.exists(dataFile): content = getContentFromNexus(dataFile, treatStringAsPath = True) triple = content[0] matrix = triple.matrixList[0] self.assertEqual(triple.taxa, expectedTaxa) self.assertEqual(matrix, expectedMatrix) else: _LOG.warn('%s not found!, test skipped.' % dataFile) def implTestFile(self, fileName): _LOG.debug('testing NEXUS reader on %s' % fileName) triple = getContentFromNexus(fileName, treatStringAsPath = True)[0] if len(triple.matrixList) == 0: _LOG.debug('No matrix found in %s' % fileName) return matrix = triple.matrixList[0] tmpFD, fn = tempfile.mkstemp(suffix = '.nex', dir=parDir) try: try: tempFile = os.fdopen(tmpFD, 'w') except: os.close(tmpFD) raise try: tempFile.write('#NEXUS\n') #_LOG.warn(str(matrix.symbolLookup)) writeNexusDataMatrix(tempFile, matrix, triple.taxa) if triple.treeList: triple.treeList = [CipresTree(i) for i in triple.treeList] writeTreesBlock(tempFile, triple.treeList, triple.taxa) tempFile.flush() sec = getContentFromNexus(fn, treatStringAsPath = True)[0] self.assertEqual(sec.matrixList[0], matrix) for i, t in enumerate(triple.treeList): self.assertEquals(t, CipresTree(sec.treeList[i])) self.assertEquals(sec.taxa, triple.taxa) finally: tempFile.close() finally: os.remove(fn) def implEnvFileListTests(self, parPath, fileList): if not os.path.exists(parPath): _LOG.warn('some tests were skipped because the directory %s does not exist' % parPath) return for file in fileList: dataFile = os.path.join(parPath, file) _LOG.debug('testing %s' % dataFile) if os.path.exists(dataFile): self.implTestFile(dataFile) else: _LOG.warn('%s not found!, test skipped.' % dataFile) def testSimple(self): '''This just reads a file with the NEXUS reader, writes it out (using python), reads it again, and verifies that the CipresIDL_api1.DataMatrix from both parsings are identical.''' filesToTest = [ 'from_hell.nex', 'taylor.nex' ] self.implEnvFileListTests(test_install_dir, filesToTest) def testInternalPoundNEXUS(self): self.implEnvFileListTests(test_install_dir, ['internalPoundNexus.nex']) def testMatrix(self): fileTaxaMatrixTuples = [ ( os.path.join(testFilesDir, 'from_hell.nex'), ['a b', 'c d', "e' 'f", 'g h'], CipresIDL_api1.DataMatrix('ACGT?N', 4L, 10L, [[0], [1], [2], [3], [-1, 0, 1, 2, 3], [0, 1, 2, 3]], [ [ 4, 5, -1, -1, -1, 1, 1, 0, 0, 0], [-1, -1, -1, -1, -1, 1, 1, 0, 0, 0], [ 4, 4, 4, 4, 4, 0, 3, 1, 2, 2], [ 5, 5, 5, 5, 5, 0, 3, 1, 2, 2], ], CipresIDL_api1.DNA_DATATYPE) ), ] self.implPathMatrixTest(fileTaxaMatrixTuples) def testPipres(self): '''This just reads a file with the NEXUS reader, writes it out (using python), reads it again, and verifies that the CipresIDL_api1.DataMatrix from both parsings are identical.''' filesToTest = [ '12Tx432C.nex', ] self.implEnvFileListTests(testFilesDir, filesToTest) def testFiles(self): someFiles = [ 'rbcl500TPaupTree.nex', 'mrB_9Tx720C.nex', 'rbcl500T.nex', 'TestNexusFile.nex',] someMoreFiles = 'SLOW_CIPRES_TESTS' in os.environ and ['11361Tx1360C.nex', '12Tx432C.nex', '49Tx2543C.nex', '500Tx759C.nex', '567Tx2153C.nex', '7180Tx1359C.nex'] or [] self.implEnvFileListTests(testFilesDir, someFiles + someMoreFiles) def _testMBFiles(self): self.implEnvFileListTests(None, [os.path.abspath('/Applications'), 'PhylogenetiX','MrBayes_V3.0_mac'], ['adh.nex', 'anolis.nex', 'avian_ovomucoids.nex', 'bglobin.nex', 'replicase.nex']) if __name__ == '__main__': if '-f' in sys.argv: def fast(self): print 'test skipped - fast mode' TestReadNexus.testFiles = fast sys.argv.remove('-f') try: nexusReader = TestReadNexus.getTestingRegistry().getServiceOrThrow(CipresIDL_api1.ReadNexus) nexusReader.remove() except: logException(_LOG) _LOG.warn('Tests skipped: Registry error') sys.exit(0) main()