""" Search this file for "REPLACE" and replace as needed. Note that some of this example is very cipres specific, especially the getToolType function. """ import os import string import math import re import pwd import subprocess def getUsername(): return pwd.getpwuid( os.getuid() )[ 0 ] __property_separator_regex=""":|=|(?<=[^\\\\]) """ __property_regex = re.compile(__property_separator_regex) def getProperties(filename): propFile= file( filename, "rU" ) propDict= dict() for propLine in propFile: propDef= propLine.strip() if len(propDef) == 0: continue if propDef[0] in ( '!', '#' ): continue separator_location = __property_regex.search(propDef) if separator_location is not None: found = separator_location.start() else: found = len(propDef) name= propDef[:found].rstrip().replace('\\ ', ' ') value= propDef[found:].lstrip(":= ").rstrip() propDict[name]= value propFile.close() # print propDict return propDict def getToolType(commandlineString): if re.search(r'garli', "".join(commandlineString).lower()): return "garli" elif re.search(r'raxml', "".join(commandlineString).lower()): return "raxml" elif re.search(r'mbhwrapper', "".join(commandlineString).lower()): return "mrbayes" elif re.search(r'beast', "".join(commandlineString).lower()): return "beast" return None shared_queue = "shared" shared_queue_limit = 20160.0 short_queue = "compute" short_queue_limit = 20160.0 gpu_queue = "gpu" gpu_shared_queue = "gpu-shared" cores_per_node = 24 # Effectively get rid of max_nodes by setting it to 5000 max_nodes = 5000 max_cores = max_nodes * cores_per_node default_cores = cores_per_node account = "REPLACE_THIS_IN_WITH_YOUR_ACCOUNT" scheduler_file = "scheduler.conf" max_file_size = 7812500.0 email = "REPLACE_THIS_IN_WITH_YOUR_EMAIL_ADDRESS" jobname = "" runfile = "./_batch_command.run" statusfile = "./_batch_command.status" cmdfile = "./_batch_command.cmdline" jobdir = os.getcwd() epilogue="%s/_epilogue.sh" % (jobdir) jobname = os.environ.get("WB_JOBID", "cipres") def schedulerInfo(properties, tooltype): """ properties is a dictionary containing keys: jobtype, mpi_processes, threads_per_process, nodes, runhours, node_exclusive, gpu. gpu=n, where n is greater than 0, means use n gpus. Job will be run in the gpu or gpu-shared partition, depending on value of node_exclusive. Based on properties and hardcoded info about the resource this returns a dictionary containing: is_direct, is_mpi, queue, runtime, mpi_processes, nodes, cores_used, cores_used_per_node, gpu""" # get runhours from properties and convert it to minutes, default to ???. try: runtime = properties.get("runhours", 0.0) runtime = math.ceil(float(runtime) * 60 ) except: runtime = 0.0 retval = { "runtime":runtime, "threads_per_process":int(properties.get("threads_per_process", 1)), "nodes": int(properties.get("nodes", 1)), "mpi_processes":int(properties.get("mpi_processes",1)), "node_exclusive":int(properties.get("node_exclusive",1)), "is_direct" : [False, True][properties.get("jobtype", "serial") == "direct"], "is_mpi" : [False, True][properties.get("jobtype", "serial") == "mpi"], "gpu": int(properties.get("gpu", 0)), "workflow_type": properties.get("workflow_type", "none") } retval["cores_used"] = retval["mpi_processes"] * retval["threads_per_process"] retval["cores_used_per_node"] = math.ceil(float(retval["cores_used"]) / retval["nodes"]) if retval["node_exclusive"]: if retval["gpu"]: retval["queue"] = gpu_queue else: retval["queue"] = short_queue else: if retval["gpu"]: retval["queue"] = gpu_shared_queue else: retval["queue"] = shared_queue return retval def log(filename, message): f = open(filename, "a") f.write(message) f.close() def deleteJob(jobid, workingdir): if os.path.isfile(workingdir + "/cancelJobs"): os.chdir(workingdir) cmd = "./cancelJobs %d" % jobid else: cmd = "scancel %d" % jobid p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outerr = p.communicate() output = outerr[0] err = outerr[1] if (p.returncode != 0): raise SystemError("Error running '%s', return code is %d. stdout is '%s', stderr is '%s'" % (cmd, p.returncode, output, err)) def jobInQueue(): """ Return list of all of the user's jobs that are in the queue and aren't completed or cancelled. They may be waiting, running, etc. """ cmd = "squeue -u %s -h -o '%%.7i %%T'" % getUsername() p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outerr = p.communicate() output = outerr[0] err = outerr[1] if (p.returncode != 0): raise SystemError("Error running squeue, return code is %d. stderr is %s" % (p.returncode, err)) if (len(err) != 0): raise SystemError("Error running squeue, stderr is %s" % (err)) output_rows = output.split("\n") jobs = [] for row in output_rows: r = row.split() if len(r) > 1 and (r[1] != "CANCELLED" and r[1] != "COMPLETED"): jobs.append(r[0]) return jobs def submitDirectJob(account, url, email, jobname, commandline): rfile = open(cmdfile, "w") rfile.write("#!/bin/sh\n") rfile.write(" ".join(commandline)) rfile.write(" --account %s" % account) rfile.write(" --url %s" % url) rfile.write(" --email %s" % email) rfile.write("\n") rfile.close() os.chmod(cmdfile, 0744); cmd = cmdfile p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) output = p.communicate()[0] retval = p.returncode if retval != 0: print "Error submitting job:\n" print output log(statusfile, "submitDirectJob is returning %d.\nStdout/stderr is:%s\n" % (retval, output)) # When there's a bash syntax error in a script it exits with 2, but if we return 2, we've # defined that to mean "too many jobs queued" and cipres will print a special message. if (retval == 2): retval = 1 return retval log(statusfile, "Job submission stdout/stderr is: %s\n" % output) # output should be just the full job id, .trestles-fe1.sdsc.edu: firstline = output.splitlines() if len(firstline) == 1: firstline = firstline[0] p = re.compile(r"^(\d+).trestles.\S+", re.M) m = p.search(output) if m != None: jobid = m.group(0) short_jobid = m.group(1) print "jobid=%d" % int(short_jobid) log(statusfile, "JOBID is %s\n" % jobid) log("./_JOBINFO.TXT", "\nJOBID=%s\n" % jobid) return 0 print "Error, job submission says: %s" % output log(statusfile, "can't find jobid, submitDirectJob is returning 1\n") return 1 # Returns 0 on success, 2 means too many jobs queued. def submitJob(): cmd = "sbatch -L cipres:1 %s 2>> %s" % (runfile, statusfile) log(statusfile, "running: %s\n" % (cmd)) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) output = p.communicate()[0] retval = p.returncode if retval != 0: # read whatever sbatch wrote to the statusfile and print it to stdout print "Error submitting job, sbatch says:\n" f = open(statusfile, "r"); print f.read(), "\n\n"; f.close() print output # When we return 2 it means too many jobs are queued. qstat returns -226 on abe # in this situation ... not sure if that's true here, on trestles as well. # if retval == -226: # retval = 2 log(statusfile, "submit_job is returning %d\n" % retval) return retval log(statusfile, "sbatch output is: " + output + "\n" + "======================================================================" + "\n") # output from sbatch should 'submitted batch job N' wehre N is jobid. # p = re.compile(r"^(\d+).trestles.\S+", re.M) p = re.compile(r"^Submitted batch job (\d+)", re.M) m = p.search(output) if m != None: jobid = m.group(1) print "jobid=%d" % int(jobid) log(statusfile, "JOBID is %s\n" % jobid) log("./_JOBINFO.TXT", "\nJOBID=%s\n" % jobid) submitAttributes(jobid) return 0 else: print "Error, sbatch says: %s" % output log(statusfile, "can't get jobid, submit_job is returning 1\n") return 1 def submitAttributes(jobid): try: jobinfo = getProperties("./_JOBINFO.TXT") if "email" in jobinfo: uid = jobinfo["email"] else: uid = "UNKNOWN@phylo.org" submitdate = subprocess.Popen(["date", "+%F %T %:z"], stdout=subprocess.PIPE).communicate()[0] cmd = "gateway_submit_attributes -gateway_user '%s' -submit_time '%s' -jobid '%s'" % (uid.strip(), submitdate.strip(), jobid) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outerr = p.communicate() output = outerr[0] err = outerr[1] if (p.returncode != 0): log(statusfile, "%s failed with exit status %d\n" % (cmd, p.returncode)); log(statusfile, "stdout: %s\n" % (output)); log(statusfile, "stderr: %s\n" % (err)); else: log(statusfile, "ran %s" % (cmd)) except Exception, err: log(statusfile, "submitAttributes exception") def splitCommandLine(cmd): """ Splits cmd at && into individual commands, writing each one to a file named lib.cmdfile_I, where I is a number (1, ...). Returns the list of filenames created. """ str = " ".join(cmd) cmdlist = str.split("&&") filelist = [] i = 1 for cmd in cmdlist : cmd = cmd.strip() if not len(cmd): continue; filename = "%s_%s" % (cmdfile, i) rfile = open(filename, "w") rfile.write("#!/bin/sh\n") rfile.writelines((cmd, "\n")) rfile.close() os.chmod(filename, 0744) filelist.append(filename) i = i + 1 return filelist