#!/usr/bin/env python import sample_lib as lib import sys import os import textwrap class Submission(object): def __init__(self, argv): #COMMAND LINE PARSING import argparse parser = argparse.ArgumentParser() parser.add_argument('--account', metavar="ACCOUNT", type=str, default=lib.account, help="The account string to use when submitting jobs. Default is read from config files.") parser.add_argument('--url', metavar="URL", dest="URL", type=str, help="Notification URL") try: cmdline_options, cmd = parser.parse_known_args(argv) cmd = cmd[1:] if not ('--' in cmd) else cmd[cmd.index('--')+1:] except Exception as e: print "There was a problem submitting your job" print e sys.exit(1) self.cmdline = cmd self.account = cmdline_options.account self.url = cmdline_options.URL self.tooltype = lib.getToolType(self.cmdline) self.scheduler_info = lib.schedulerInfo(lib.getProperties("scheduler.conf"), self.tooltype) def createEpilog(self): rfile = open(lib.epilogue, "w") text = """#!/bin/bash date +'%%s %%a %%b %%e %%R:%%S %%Z %%Y' > %s/term.txt squeue -j $SLURM_JOB_ID -l >> %s/term.txt echo "This file created by srun of: $*" >> %s/term.txt # scontrol show jobid $SLURM_JOB_ID >> %s/term.txt """ % (lib.jobdir, lib.jobdir, lib.jobdir, lib.jobdir) rfile.write(textwrap.dedent(text)); rfile.close(); os.chmod(lib.epilogue, 0744); def createCommandFiles(self): cmdfileList = None if self.isSimpleWorkflow(): cmdfileList = lib.splitCommandLine(self.cmdline) else: # Write the command line to a file, batch_command.cmdline. rfile = open(lib.cmdfile, "w") rfile.write("#!/bin/sh\n") rfile.writelines((" ".join(self.cmdline), "\n")) rfile.close() os.chmod(lib.cmdfile, 0744) cmdfileList = [] cmdfileList.append(lib.cmdfile) return cmdfileList def submitBatchScript(self): # If this is a "direct" run type job we don't need to create a qsub script, we'll just run batch_ommand.cmdline. # The command is responsible for creating done.txt and running an epilog script to create term.txt. if self.scheduler_info["is_direct"]: return lib.submitDirectJob(account, url, lib.email, lib.jobname, cmdline) self.createEpilog() self.cmdfileList = self.createCommandFiles() # Create the qsub script rfile = open(lib.runfile, "w") rfile.write("#!/bin/sh\n") text = """ #SBATCH -p %s #SBATCH --qos=cipres #SBATCH --job-name %s #SBATCH --time=00:%d:00 #SBATCH -o _scheduler_stdout.txt #SBATCH -e _scheduler_stderr.txt # TODO: make sure umask works, not sure what the corresponding SBATCH cmd is. #PBS -W umask=0007 #SBATCH --export=ALL #SBATCH --mail-user="%s" #SBATCH --mail-type="all" #SBATCH -A %s """ % (self.scheduler_info["queue"], lib.jobname, self.scheduler_info["runtime"], lib.email, self.account) rfile.write(textwrap.dedent(text)) if self.scheduler_info["gpu"] > 0 : # TODO: make sure this comes out right. text = "#SBATCH -N %d\n" % (self.scheduler_info["nodes"]) + "#SBATCH --gres=gpu:%d\n" % (self.scheduler_info["gpu"]) else: # --ntasks-per-node is the (number_mpi_processes * threads_per_process)/nodes = cores_used_per_node text = "#SBATCH -N %d\n#SBATCH --ntasks-per-node=%d\n" % (self.scheduler_info["nodes"], self.scheduler_info["cores_used_per_node"]) rfile.write(textwrap.dedent(text)) text = """ cd %s source /etc/profile.d/modules.sh date +'%%s %%a %%b %%e %%R:%%S %%Z %%Y' > start.txt export MV2_CPU_MAPPING=`genmap -tpr %d` echo "MV2_CPU_MAPPING=$MV2_CPU_MAPPING" curl %s\&status=START export CIPRES_THREADSPP=%d export CIPRES_NP=%d """ % (lib.jobdir, int(self.scheduler_info["threads_per_process"]), # genmap -tpr self.url, # curl int(self.scheduler_info["threads_per_process"]), # CIPRES_THREADSPP int(self.scheduler_info["mpi_processes"])) # CIPRES_NP rfile.write(textwrap.dedent(text)) if not self.isSimpleWorkflow(): text = """ srun -u --epilog=%s -N %d -n %d --cpus-per-task=%d --mpi=pmi2 %s 1>stdout.txt 2>stderr.txt """ % ( lib.epilogue, # srun --epilog int(self.scheduler_info["nodes"]), # srun -N int(self.scheduler_info["mpi_processes"]), # srun -n int(self.scheduler_info["threads_per_process"]), # srun --cpus-per-task lib.cmdfile) # cmd to run (this is the %s, right before ">") rfile.write(textwrap.dedent(text)) else: if (len(self.cmdfileList) > 1): thereIsMore = "&&" else: thereIsMore = "" text = """ srun -u --epilog=%s -N %d -n %d --cpus-per-task=%d --mpi=pmi2 %s 1>stdout.txt 2>stderr.txt %s """ % ( lib.epilogue, # srun --epilog int(self.scheduler_info["nodes"]), # srun -N int(self.scheduler_info["mpi_processes"]), # srun -n int(self.scheduler_info["threads_per_process"]), # srun --cpus-per-task self.cmdfileList[0], thereIsMore ) rfile.write(textwrap.dedent(text)) # Write the rest of the commands. Starting with the 2nd one (we already did the first above). count = 2 for f in self.cmdfileList[1:] : if (len(self.cmdfileList) > count): thereIsMore = "&&" else: thereIsMore = "" text = """ srun -u --epilog=%s -N 1 -n 1 --cpus-per-task=%d --mpi=pmi2 %s 1>>stdout.txt 2>>stderr.txt %s """ % ( lib.epilogue, int(self.scheduler_info["threads_per_process"]), f, thereIsMore) rfile.write(textwrap.dedent(text)) count = count + 1 rfile.write("\nretval=$?\n") rfile.write("date +'%s %a %b %e %R:%S %Z %Y' > done.txt\n") rfile.write("curl %s\&status=DONE\n" % self.url) rfile.write("exit $retval") rfile.close() lib.log("./_JOBINFO.TXT","\nChargeFactor=%f\ncores=%i" % ( self.scheduler_info.get('ChargeFactor',1.0), self.scheduler_info['cores_used']) ) # print "made it to end" # sys.exit(0) return lib.submitJob() return 0 def isSimpleWorkflow(self): return self.scheduler_info["workflow_type"] == "simple" def main(argv=None): """ Usage is: submit.py [--account ] [--url ] -- Run from the working dir of the job which must contain (in addition to the job files) a file named scheduler.conf with scheduler properties for the job. , if present, gives the project to charge the job to. Url is the url of the submitting website including the taskid parameter. Returns 0 with "jobid=" on stdout if job submitted ok Returns 1 with multiline error message on stdout if error. Returns 2 for the specific error of queue limit exceeded. """ submission = Submission(argv) submission.submitBatchScript() if __name__ == "__main__": sys.exit(main())