package org.ngbw.utils; import java.lang.management.ManagementFactory; import java.util.List; import java.util.Vector; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.log4j.Logger; import org.apache.log4j.NDC; import org.ngbw.sdk.Workbench; import org.ngbw.sdk.database.RunningTask; /** * Standalone program to submit jobs for running_tasks have status = RunningTask.STATUS_NEW. *

* System Properties: * submitter = submit for this submitter id only (see submitter column in db, normally * the url of the web site that created the job) *

* recover = if "true" handle entries that have already been tried once. * I want to use a separate process for 1st tries so that they get priority over jobs we're we've already * failed, in case there's something that bogs them down or means they'll never succeed. * NOT USED RIGHT NOW. HANDLES ALL JOBS with status NEW, regardless of ATTEMPTS FIELD. *

*/ public class SubmitJob { private static final Logger logger = Logger.getLogger(SubmitJob.class); private static ThreadPoolExecutor threadPool = null; Vector inProgressList = new Vector(); private static String m_submitter; private static String m_default_submitter; private static long m_poll_interval; private static int m_pool_size; private static boolean m_recover = false; private static String m_local; private static String m_status; // this number is kind of arbitrary, see use of threshold in the code below. private static int threshold; private static RestartRun restartRun = null; public SubmitJob () throws Exception { threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(m_pool_size); } synchronized static void shutdownAndAwaitTermination ( int count, TimeUnit units ) { logger.debug("shutting down"); threadPool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!threadPool.awaitTermination(count, units)) { threadPool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!threadPool.awaitTermination(20, TimeUnit.SECONDS)) { logger.error("ThreadPool did not terminate"); } } } catch ( InterruptedException ie ) { // (Re-)Cancel if current thread also interrupted threadPool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } private void keepWorking () throws Exception { while (true) { /* * log.debug("Threads busy=" + threadPool.getActiveCount() + ", jobs in Q=" + * threadPool.getQueue().size() + ", taskCount=" + threadPool.getTaskCount()); */ if (restartRun != null) { //log.debug("About to start restartRuns()"); restartRun.restartRuns(); } int jobsQueued; if ((threshold == 0) || (jobsQueued = threadPool.getQueue().size()) < threshold) { int jobCount = scanAndProcess(); } else { logger.warn("Thread pool has " + jobsQueued + " jobs queued, not queuing more until queue drains."); } // In recovery mode we only scan the database once. Otherwise if a resource is down we would keep // processing the same records over and over. if (m_recover || (m_local != null)) { return; } Thread.sleep(1000 * m_poll_interval); } } private int scanAndProcess () throws Exception { // select tasks with specified submitter and status that aren't locked. Pass in m_recover instead // of null if we care about recover versus inital attempt. List list = RunningTask.findReadyToSubmit(m_submitter, null, false); if (list.size() > 0) { String tmp = ""; for (RunningTask rt : list) { tmp += rt.getJobhandle() + "-" + rt.getRunNumber() + "-" + rt.getStatus() + "-" + (rt.getLocked() == null ? "" : rt.getLocked().toString()) + ", "; } logger.debug("Found " + list.size() + " tasks to process: " + tmp); } for (RunningTask rt : list) { if (!inProgressList.contains(rt.getJobhandle())) { threadPool.execute(this.new ProcessRunningTask(rt.getJobhandle(), rt.getRunNumber())); inProgressList.add(rt.getJobhandle()); } } return list.size(); } private class ProcessRunningTask implements Runnable { long m_taskId; String m_jobhandle; int m_runNumber; ProcessRunningTask ( String jobhandle, int runNumber ) throws Exception { m_jobhandle = jobhandle; m_runNumber = runNumber; } public void run () { long startTime = System.currentTimeMillis(); NDC.push("[jh=" + m_jobhandle + ", rn=" + m_runNumber + "]"); boolean gotLock = false; try { gotLock = RunningTask.lock(m_jobhandle, m_runNumber); if (gotLock) { RunningTask rt = RunningTask.find(m_jobhandle, m_runNumber); m_taskId = rt.getTaskId(); m_jobhandle = rt.getJobhandle(); NDC.pop(); NDC.push("[task=" + m_taskId + ", job=" + m_jobhandle + ", run=" + m_runNumber + "]"); // It's possible someone else changed the status before we managed to lock the record. if (!rt.getStatus().equals(m_status)) { logger.debug( "Skipping " + m_jobhandle + " run " + m_runNumber + ". Status isn't " + m_status + ", it's " + rt.getStatus()); } else { logger.debug("Submitting job " + m_jobhandle + ", run " + m_runNumber); StageAndSubmitATask.stageAndSubmit(m_jobhandle, m_runNumber); } } else { logger.debug("Skipping " + m_jobhandle + " run " + m_runNumber + ". Already locked."); } } catch ( Exception e ) { logger.error("", e); } catch ( Error t ) { logger.error("THREAD IS DYING.", t); throw t; } finally { try { if (gotLock) { RunningTask.unlock(m_jobhandle, m_runNumber); } } catch ( Exception e ) { logger.debug("", e); } catch ( Error t ) { logger.error("THREAD IS DYING.", t); throw t; } long elapsedTime = System.currentTimeMillis() - startTime; logger.debug("SubmitJob took " + elapsedTime + " ms, or " + elapsedTime / 1000 + " seconds."); inProgressList.remove(m_jobhandle); NDC.pop(); NDC.remove(); } } } public static void main ( String[] args ) { try { Workbench wb = Workbench.getInstance(); // This overrides db connection pool establised by Workbench. //ConnectionManager.getConnectionSource().close(); //ConnectionManager.setConnectionSource(new DriverConnectionSource()); m_default_submitter = wb.getProperties().getProperty("application.instance"); logger.info("Default Submitter (application.instance) = " + m_default_submitter); String p; p = wb.getProperties().getProperty("submitJob.poll.seconds"); if (p == null) { throw new Exception("Missing workbench property: submitJob.poll.seconds"); } m_poll_interval = new Long(p); p = wb.getProperties().getProperty("submitJob.pool.size"); if (p == null) { throw new Exception("Missing workbench property: submitJob.pool.size"); } m_pool_size = new Integer(p); //threshold = thread_pool_size * 4; threshold = 0; m_submitter = System.getProperty("submitter"); logger.info("Submitter (System property: 'submitter') " + m_submitter); if (m_submitter == null) { throw new Exception("Missing property 'submitter'"); } // Properties that control behavior when used in RecoverResults mode. // Todo: not using m_local yet. Also need to add ability to specify just a single task or resource // to process. m_recover = Boolean.getBoolean("recover"); m_local = System.getProperty("local"); m_status = RunningTask.STATUS_NEW; MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName mxbeanName = new ObjectName("org.ngbw.utils:type=ToolRegistryControl"); ToolRegistryControl mxbean = new ToolRegistryControl(); mbs.registerMBean(mxbean, mxbeanName); SubmitJob lr = new SubmitJob(); logger.debug("SUBMIT JOB: for submitter=" + m_submitter + ", poll_interval in seconds=" + m_poll_interval + ", thread pool size=" + m_pool_size + ", max jobs queued = " + threshold + (m_recover ? ", Recovery Mode" : ", Normal Mode")); /* * If previous process died, unlock it's records. Pass m_recover instead of null if we care about initial * versus retry */ RunningTask.unlockReadyToSubmit(m_submitter, null); String maxRestart = null; String minMinutes = null; String maxHours = null; maxRestart = wb.getProperties().getProperty("submitJob.restartrun.maxruns"); minMinutes = wb.getProperties().getProperty("submitJob.restartrun.minminutes"); maxHours = wb.getProperties().getProperty("submitJob.restartrun.maxhours"); if ((maxRestart != null || maxHours != null) && minMinutes != null) { int maxRestartCount = new Integer(maxRestart); int minMinutesGap = new Integer(minMinutes); int maxHoursCount = new Integer(maxHours); if ((maxRestartCount > 0 || maxHoursCount > 0) && minMinutesGap > 0) { restartRun = new RestartRun(); restartRun.setMaxRestart(maxRestartCount); restartRun.setMinMinutes(minMinutesGap); restartRun.setMaxHours(maxHoursCount); restartRun.setSubmitter(m_submitter); } } /* * In recover mode, theoretically, keepWorking returns after one pass. We want to give the * threads plenty of time to complete their work after keepWorking has queued up the jobs. */ lr.keepWorking(); shutdownAndAwaitTermination(24, TimeUnit.HOURS); logger.debug("SUBMIT JOB: exitting normally."); } /* * Main thread can catch an exception if, for example it isn't able to connect * to the db. If main thread exits but leaves the threadpool threads alive but * idle the process sticks around and doesn't do anything. */ catch ( Throwable e ) { logger.error("Caught Exception. Calling shutdownAndAwaitTermination().", e); shutdownAndAwaitTermination(1, TimeUnit.MINUTES); logger.debug("SUBMIT JOB: exitting due to exception in main."); return; } } }