package org.ngbw.utils;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import org.ngbw.sdk.Workbench;
import org.ngbw.sdk.database.RunningTask;
/**
* Standalone program to submit jobs for running_tasks have status = RunningTask.STATUS_NEW.
*
* System Properties:
* submitter = submit for this submitter id only (see submitter column in db, normally
* the url of the web site that created the job)
*
* recover = if "true" handle entries that have already been tried once.
* I want to use a separate process for 1st tries so that they get priority over jobs we're we've already
* failed, in case there's something that bogs them down or means they'll never succeed.
* NOT USED RIGHT NOW. HANDLES ALL JOBS with status NEW, regardless of ATTEMPTS FIELD.
*
*/
public class SubmitJob
{
private static final Logger logger = Logger.getLogger(SubmitJob.class);
private static ThreadPoolExecutor threadPool = null;
Vector inProgressList = new Vector();
private static String m_submitter;
private static String m_default_submitter;
private static long m_poll_interval;
private static int m_pool_size;
private static boolean m_recover = false;
private static String m_local;
private static String m_status;
// this number is kind of arbitrary, see use of threshold in the code below.
private static int threshold;
private static RestartRun restartRun = null;
public SubmitJob () throws Exception
{
threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(m_pool_size);
}
synchronized static void
shutdownAndAwaitTermination ( int count, TimeUnit units )
{
logger.debug("shutting down");
threadPool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!threadPool.awaitTermination(count, units)) {
threadPool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!threadPool.awaitTermination(20, TimeUnit.SECONDS)) {
logger.error("ThreadPool did not terminate");
}
}
}
catch ( InterruptedException ie ) {
// (Re-)Cancel if current thread also interrupted
threadPool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
private void keepWorking () throws Exception
{
while (true) {
/*
* log.debug("Threads busy=" + threadPool.getActiveCount() + ", jobs in Q=" +
* threadPool.getQueue().size() + ", taskCount=" + threadPool.getTaskCount());
*/
if (restartRun != null) {
//log.debug("About to start restartRuns()");
restartRun.restartRuns();
}
int jobsQueued;
if ((threshold == 0) || (jobsQueued = threadPool.getQueue().size()) < threshold) {
int jobCount = scanAndProcess();
}
else {
logger.warn("Thread pool has " + jobsQueued + " jobs queued, not queuing more until queue drains.");
}
// In recovery mode we only scan the database once. Otherwise if a resource is down we would keep
// processing the same records over and over.
if (m_recover || (m_local != null)) {
return;
}
Thread.sleep(1000 * m_poll_interval);
}
}
private int scanAndProcess () throws Exception
{
// select tasks with specified submitter and status that aren't locked. Pass in m_recover instead
// of null if we care about recover versus inital attempt.
List list = RunningTask.findReadyToSubmit(m_submitter, null, false);
if (list.size() > 0) {
String tmp = "";
for (RunningTask rt : list) {
tmp += rt.getJobhandle() + "-" + rt.getRunNumber() + "-" + rt.getStatus() + "-" + (rt.getLocked() == null ? "" : rt.getLocked().toString()) + ", ";
}
logger.debug("Found " + list.size() + " tasks to process: " + tmp);
}
for (RunningTask rt : list) {
if (!inProgressList.contains(rt.getJobhandle())) {
threadPool.execute(this.new ProcessRunningTask(rt.getJobhandle(), rt.getRunNumber()));
inProgressList.add(rt.getJobhandle());
}
}
return list.size();
}
private class ProcessRunningTask implements Runnable
{
long m_taskId;
String m_jobhandle;
int m_runNumber;
ProcessRunningTask ( String jobhandle, int runNumber ) throws Exception
{
m_jobhandle = jobhandle;
m_runNumber = runNumber;
}
public void run ()
{
long startTime = System.currentTimeMillis();
NDC.push("[jh=" + m_jobhandle + ", rn=" + m_runNumber + "]");
boolean gotLock = false;
try {
gotLock = RunningTask.lock(m_jobhandle, m_runNumber);
if (gotLock) {
RunningTask rt = RunningTask.find(m_jobhandle, m_runNumber);
m_taskId = rt.getTaskId();
m_jobhandle = rt.getJobhandle();
NDC.pop();
NDC.push("[task=" + m_taskId + ", job=" + m_jobhandle + ", run=" + m_runNumber + "]");
// It's possible someone else changed the status before we managed to lock the record.
if (!rt.getStatus().equals(m_status)) {
logger.debug(
"Skipping " + m_jobhandle + " run " + m_runNumber + ". Status isn't " + m_status + ", it's "
+ rt.getStatus());
}
else {
logger.debug("Submitting job " + m_jobhandle + ", run " + m_runNumber);
StageAndSubmitATask.stageAndSubmit(m_jobhandle, m_runNumber);
}
}
else {
logger.debug("Skipping " + m_jobhandle + " run " + m_runNumber + ". Already locked.");
}
}
catch ( Exception e ) {
logger.error("", e);
}
catch ( Error t ) {
logger.error("THREAD IS DYING.", t);
throw t;
}
finally {
try {
if (gotLock) {
RunningTask.unlock(m_jobhandle, m_runNumber);
}
}
catch ( Exception e ) {
logger.debug("", e);
}
catch ( Error t ) {
logger.error("THREAD IS DYING.", t);
throw t;
}
long elapsedTime = System.currentTimeMillis() - startTime;
logger.debug("SubmitJob took " + elapsedTime + " ms, or " + elapsedTime / 1000 + " seconds.");
inProgressList.remove(m_jobhandle);
NDC.pop();
NDC.remove();
}
}
}
public static void main ( String[] args )
{
try {
Workbench wb = Workbench.getInstance();
// This overrides db connection pool establised by Workbench.
//ConnectionManager.getConnectionSource().close();
//ConnectionManager.setConnectionSource(new DriverConnectionSource());
m_default_submitter = wb.getProperties().getProperty("application.instance");
logger.info("Default Submitter (application.instance) = " + m_default_submitter);
String p;
p = wb.getProperties().getProperty("submitJob.poll.seconds");
if (p == null) {
throw new Exception("Missing workbench property: submitJob.poll.seconds");
}
m_poll_interval = new Long(p);
p = wb.getProperties().getProperty("submitJob.pool.size");
if (p == null) {
throw new Exception("Missing workbench property: submitJob.pool.size");
}
m_pool_size = new Integer(p);
//threshold = thread_pool_size * 4;
threshold = 0;
m_submitter = System.getProperty("submitter");
logger.info("Submitter (System property: 'submitter') " + m_submitter);
if (m_submitter == null) {
throw new Exception("Missing property 'submitter'");
}
// Properties that control behavior when used in RecoverResults mode.
// Todo: not using m_local yet. Also need to add ability to specify just a single task or resource
// to process.
m_recover = Boolean.getBoolean("recover");
m_local = System.getProperty("local");
m_status = RunningTask.STATUS_NEW;
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName("org.ngbw.utils:type=ToolRegistryControl");
ToolRegistryControl mxbean = new ToolRegistryControl();
mbs.registerMBean(mxbean, mxbeanName);
SubmitJob lr = new SubmitJob();
logger.debug("SUBMIT JOB: for submitter=" + m_submitter + ", poll_interval in seconds="
+ m_poll_interval + ", thread pool size=" + m_pool_size + ", max jobs queued = " + threshold
+ (m_recover ? ", Recovery Mode" : ", Normal Mode"));
/*
* If previous process died, unlock it's records. Pass m_recover instead of null if we care about initial
* versus retry
*/
RunningTask.unlockReadyToSubmit(m_submitter, null);
String maxRestart = null;
String minMinutes = null;
String maxHours = null;
maxRestart = wb.getProperties().getProperty("submitJob.restartrun.maxruns");
minMinutes = wb.getProperties().getProperty("submitJob.restartrun.minminutes");
maxHours = wb.getProperties().getProperty("submitJob.restartrun.maxhours");
if ((maxRestart != null || maxHours != null) && minMinutes != null) {
int maxRestartCount = new Integer(maxRestart);
int minMinutesGap = new Integer(minMinutes);
int maxHoursCount = new Integer(maxHours);
if ((maxRestartCount > 0 || maxHoursCount > 0) && minMinutesGap > 0) {
restartRun = new RestartRun();
restartRun.setMaxRestart(maxRestartCount);
restartRun.setMinMinutes(minMinutesGap);
restartRun.setMaxHours(maxHoursCount);
restartRun.setSubmitter(m_submitter);
}
}
/*
* In recover mode, theoretically, keepWorking returns after one pass. We want to give the
* threads plenty of time to complete their work after keepWorking has queued up the jobs.
*/
lr.keepWorking();
shutdownAndAwaitTermination(24, TimeUnit.HOURS);
logger.debug("SUBMIT JOB: exitting normally.");
}
/*
* Main thread can catch an exception if, for example it isn't able to connect
* to the db. If main thread exits but leaves the threadpool threads alive but
* idle the process sticks around and doesn't do anything.
*/
catch ( Throwable e ) {
logger.error("Caught Exception. Calling shutdownAndAwaitTermination().", e);
shutdownAndAwaitTermination(1, TimeUnit.MINUTES);
logger.debug("SUBMIT JOB: exitting due to exception in main.");
return;
}
}
}