package org.ngbw.utils;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.List;
import org.ngbw.sdk.Workbench;
import org.ngbw.sdk.database.RunningTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.NDC;
/**
* Standalone program to submit jobs for running_tasks have status = RunningTask.STATUS_NEW. *
* System Properties: submitter = submit for this submitter id only (see submitter column in db,
* normally the url of the web site that created the job)
*
* recover = if "true" handle entries that have already been tried once. I want to use a separate
* process for 1st tries so that they get priority over jobs we're we've already failed, in case
* there's something that bogs them down or means they'll never succeed. NOT USED RIGHT NOW. HANDLES
* ALL JOBS with status NEW, regardless of ATTEMPTS FIELD.
*
*/
public class SubmitJob
{
private static final Log log = LogFactory.getLog(SubmitJob.class.getName());
private static ThreadPoolExecutor threadPool = null;
List inProgressList = new ArrayList();
private static String m_submitter;
private static String m_default_submitter;
private static long m_poll_interval;
private static int m_pool_size;
private static boolean m_recover = false;
private static String m_local;
private static String m_status;
// this number is kind of arbitrary, see use of threshold in the code below.
private static int threshold;
public SubmitJob () throws Exception
{
threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(m_pool_size);
}
public static void main ( String[] args )
{
try
{
Workbench wb = Workbench.getInstance();
// This overrides db connection pool establised by Workbench.
//ConnectionManager.getConnectionSource().close();
//ConnectionManager.setConnectionSource(new DriverConnectionSource());
m_default_submitter = wb.getProperties().getProperty("application.instance");
String p;
p = wb.getProperties().getProperty("submitJob.poll.seconds");
if (p == null)
{
throw new Exception("Missing workbench property: submitJob.poll.seconds");
}
m_poll_interval = new Long(p);
p = wb.getProperties().getProperty("submitJob.pool.size");
if (p == null)
{
throw new Exception("Missing workbench property: submitJob.pool.size");
}
m_pool_size = new Integer(p);
//threshold = thread_pool_size * 4;
threshold = 0;
m_submitter = System.getProperty("submitter");
if (m_submitter == null)
{
throw new Exception("Missing system property submitter");
}
// Properties that control behavior when used in RecoverResults mode.
// Todo: not using m_local yet. Also need to add ability to specify just a single task or resource
// to process.
m_recover = Boolean.getBoolean("recover");
m_local = System.getProperty("local");
m_status = RunningTask.STATUS_NEW;
SubmitJob lr = new SubmitJob();
log.debug("SUBMIT JOB: for submitter=" + m_submitter + ", poll_interval in seconds="
+ m_poll_interval + ", thread pool size=" + m_pool_size + ", max jobs queued = " + threshold
+ (m_recover ? ", Recovery Mode" : ", Normal Mode"));
/*
* If previous process died, unlock it's records. Pass m_recover instead of null if we
* care about initial versus retry
*/
RunningTask.unlockReadyToSubmit(m_submitter, null);
/*
* In recover mode, theoretically, keepWorking returns after one pass. We want to give
* the threads plenty of time to complete their work after keepWorking has queued up the
* jobs.
*/
lr.keepWorking();
shutdownAndAwaitTermination(24, TimeUnit.HOURS);
log.debug("SUBMIT JOB: exitting normally.");
}
/*
* Main thread can catch an exception if, for example it isn't able to connect to the db. If
* main thread exits but leaves the threadpool threads alive but idle the process sticks
* around and doesn't do anything.
*/
catch ( Throwable e )
{
log.error("Caught Exception. Calling shutdownAndAwaitTermination().", e);
shutdownAndAwaitTermination(1, TimeUnit.MINUTES);
log.debug("SUBMIT JOB: exitting due to exception in main.");
return;
}
}
static void shutdownAndAwaitTermination ( int count, TimeUnit units )
{
log.debug("shutting down");
threadPool.shutdown(); // Disable new tasks from being submitted
try
{
// Wait a while for existing tasks to terminate
if (!threadPool.awaitTermination(count, units))
{
threadPool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!threadPool.awaitTermination(20, TimeUnit.SECONDS))
{
log.error("ThreadPool did not terminate");
}
}
}
catch ( InterruptedException ie )
{
// (Re-)Cancel if current thread also interrupted
threadPool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
private void keepWorking () throws Exception
{
while (true)
{
/*
* log.debug("Threads busy=" + threadPool.getActiveCount() + ", jobs in Q=" +
* threadPool.getQueue().size() + ", taskCount=" + threadPool.getTaskCount());
*/
int jobsQueued;
if ((threshold == 0) || (jobsQueued = threadPool.getQueue().size()) < threshold)
{
int jobCount = scanAndProcess();
}
else
{
log.warn("Thread pool has " + jobsQueued + " jobs queued, not queuing more until queue drains.");
}
// In recovery mode we only scan the database once. Otherwise if a resource is down we would keep
// processing the same records over and over.
if (m_recover || (m_local != null))
{
return;
}
Thread.sleep(1000 * m_poll_interval);
}
}
private int scanAndProcess () throws Exception
{
// select tasks with specified submitter and status that aren't locked. Pass in m_recover instead
// of null if we care about recover versus inital attempt.
List list
= RunningTask.findReadyToSubmit(m_submitter, null, false);
if (list.size() > 0)
{
String tmp = "";
for (RunningTask rt : list)
{
tmp += rt.getJobhandle() + "-" + rt.getStatus() + "-" + (rt.getLocked() == null ? "" : rt.getLocked().toString()) + ", ";
}
log.debug("Found " + list.size() + " tasks to process: " + tmp);
}
for (RunningTask rt : list)
{
if (!inProgressList.contains(rt.getJobhandle()))
{
threadPool.execute(this.new ProcessRunningTask(rt.getJobhandle()));
inProgressList.add(rt.getJobhandle());
}
}
return list.size();
}
private class ProcessRunningTask implements Runnable
{
long m_taskId;
String m_jobhandle;
ProcessRunningTask ( String jobhandle ) throws Exception
{
m_jobhandle = jobhandle;
}
public void run ()
{
long startTime = System.currentTimeMillis();
NDC.push("[jh =" + m_jobhandle + "]");
boolean gotLock = false;
try
{
gotLock = RunningTask.lock(m_jobhandle);
if (gotLock)
{
RunningTask rt = RunningTask.find(m_jobhandle);
m_taskId = rt.getTaskId();
m_jobhandle = rt.getJobhandle();
NDC.pop();
NDC.push("[task=" + m_taskId + ", job=" + m_jobhandle + "]");
// It's possible someone else changed the status before we managed to lock the record.
if (!rt.getStatus().equals(m_status))
{
log.debug("Skipping " + m_jobhandle + ". Status isn't " + m_status + ", it's "
+ rt.getStatus());
}
else
{
log.debug("Submitting job " + m_jobhandle);
StageAndSubmitATask.stageAndSubmit(m_jobhandle);
}
}
else
{
log.debug("Skipping " + m_jobhandle + ". Already locked.");
}
}
catch ( Exception e )
{
log.error("", e);
}
catch ( Error t )
{
log.error("THREAD IS DYING.", t);
throw t;
}
finally
{
try
{
if (gotLock)
{
RunningTask.unlock(m_jobhandle);
}
}
catch ( Exception e )
{
log.debug("", e);
}
catch ( Error t )
{
log.error("THREAD IS DYING.", t);
throw t;
}
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("SubmitJob took " + elapsedTime + " ms, or " + elapsedTime / 1000 + " seconds.");
inProgressList.remove(m_jobhandle);
NDC.pop();
NDC.remove();
}
}
}
}