#!/usr/bin/env python import sys import traceback import itertools from optparse import OptionParser try: import PIPRes from PIPRes.basic import readFilePathAsNexus, writeAsNexus from PIPRes.util.client_import import * from PIPRes.util.io import tryRemove, tryRmdir, makeCipresTempDir except: sys.exit("The importing of CIPRES python failed.\nMake sure that your environment is configured correctly.") def parse_file(fn): "Returns taxa, matrix, and trees from fn" if not os.path.exists(fn): sys.exit(fn + " not found.") try: blobs = readFilePathAsNexus(fn) except CipresIDL_api1.NexusException, e: sys.exit("\nNEXUS error at line %d:\n%s\n\n" % (e.lineNumber, e.errorMsg)) except Exception: traceback.print_exc(sys.stderr) sys.exit("\n\nError during NEXUS parsing") for blob in blobs: blob.treeList = [CipresTree(i) for i in blob.treeList] return blobs if __name__ == '__main__': parser = OptionParser() parser.add_option("-n", "--normalized", dest="normalized", default="", help="destination for NEXUS normalized output to be written", metavar="FILE") parser.add_option("-p", "--print", dest="to_std_out", default=False, action="store_true", help="Write the data that was read to standard out",) parser.add_option("-c", "--check", dest="round_trip", default=False, action="store_true", help="check that the written output is equivalent to the input (perform a round trip test).",) (options, args) = parser.parse_args() if len(args) < 1: sys.exit("Expecting a data file as a command line argument") fn = args[0] registry = getCipresRegistry() tempDirectory = None normalized_out = None if options.normalized: out_file_path = options.normalized normalized_out = open(options.normalized, "w") elif options.round_trip: tempDirectory = makeCipresTempDir(prefix="nxs") out_file_path = os.path.join(tempDirectory, "normalized.nex") normalized_out = open(out_file_path, "w") try: content = parse_file(fn) if normalized_out: try: writeAsNexus(normalized_out, content, appending=False) finally: normalized_out.close() if options.round_trip: sec_parse = parse_file(out_file_path) m = "Could not successfully convey all of the info in %s into the file %s" % (fn, out_file_path) if len(sec_parse) != len(content): sys.exit(m) for s, c in itertools.izip(sec_parse, content): if not c != s: sys.exit(m) if options.to_std_out: writeAsNexus(sys.stdout, content, appending=False) finally: if tempDirectory: tryRemove(out_file_path) if not tryRmdir(tempDirectory): sys.exit('could not remove ' + tempDirectory)