package org.ngbw.utils; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Set; import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.ngbw.sdk.Workbench; import org.ngbw.sdk.core.shared.TaskRunStage; import org.ngbw.sdk.database.ConnectionManager; import org.ngbw.sdk.database.RunStatus; import org.ngbw.sdk.database.Statistics; import org.ngbw.sdk.database.Task; import org.ngbw.sdk.database.TaskRun; /** * * @author mzhuang */ public class RestartRun { private static final Log log = LogFactory.getLog(RestartRun.class.getName()); private static Connection m_dbConn = null; private int m_maxRestart = 0; private int m_minMinutes = 0; private int m_maxHours = 0; private String m_submitter = null; static { try { m_dbConn = ConnectionManager.getConnectionSource().getConnection(); } catch ( SQLException sqlEx ) { sqlEx.printStackTrace(); } } public void setMaxRestart ( int maxRestart ) { m_maxRestart = maxRestart; } public void setMinMinutes ( int minMinutes ) { m_minMinutes = minMinutes; } public void setMaxHours ( int maxHours ) { m_maxHours = maxHours; } public void setSubmitter ( String submitter ) { this.m_submitter = submitter; } public boolean restartRuns () { if (!((m_maxRestart > 0 || m_maxHours > 0) && m_minMinutes > 0)) { log.error("m_maxRestart = " + m_maxRestart + " m_maxHours = " + m_maxHours + " m_minMinutes = " + m_minMinutes); log.error("minMinutes must be bigger than zero and one of maxRestart/maxHours must be bigger than zero. " + "Since these criteria are not met, will not execute restart runs"); return false; } /* * String stmt = "SELECT TASK_ID, MAX(RUN_NUMBER) MAX_RUN_NUMBER, OK, CREATION_DATE, STAGE, IS_TERMINAL FROM * task_runs GROUP BY TASK_ID HAVING " * + "OK = 0 AND MAX_RUN_NUMBER > 0 AND MAX_RUN_NUMBER < ? AND TIMESTAMPDIFF(MINUTE, CREATION_DATE, NOW()) >= ? * " * + "AND TIMESTAMPDIFF(HOUR, CREATION_DATE, NOW()) < ? " * + "AND STAGE<>? AND STAGE <> ? AND STAGE <> ? AND STAGE <> ? AND STAGE <> ? " * + "AND IS_TERMINAL = 1"; */ String stmt = "SELECT TASK_ID, RUN_NUMBER, STATUS, CREATION_DATE, STAGE, IS_TERMINAL FROM task_runs t1 WHERE " + "STATUS in (?, ?) AND RUN_NUMBER > 0 " + (m_maxRestart > 0 ? "AND RUN_NUMBER < ? " : " ") + " AND TIMESTAMPDIFF(MINUTE, CREATION_DATE, NOW()) >= ? " + (m_maxHours > 0 ? "AND TIMESTAMPDIFF(HOUR, CREATION_DATE, NOW()) < ? " : " ") + "AND STAGE <> ? AND STAGE <> ? AND STAGE <> ? AND STAGE <> ? AND STAGE <> ? " + "AND IS_TERMINAL = 1 " + "AND RUN_NUMBER in (SELECT MAX(RUN_NUMBER) FROM task_runs t2 WHERE t1.TASK_ID = t2.TASK_ID)"; PreparedStatement selectStmt = null; ResultSet resultRows = null; Set taskIDSet = new TreeSet(); try { selectStmt = m_dbConn.prepareStatement(stmt); int locCount = 1; selectStmt.setInt(locCount++, RunStatus.TEMPORARY_COMMU.value()); selectStmt.setInt(locCount++, RunStatus.TEMPORARY_PROBLEM.value()); if (m_maxRestart > 0) { selectStmt.setInt(locCount++, m_maxRestart); } selectStmt.setInt(locCount++, m_minMinutes); if (m_maxHours > 0) { selectStmt.setInt(locCount++, m_maxHours); } selectStmt.setString(locCount++, TaskRunStage.READY.toString()); selectStmt.setString(locCount++, TaskRunStage.SUBMITTED.toString()); selectStmt.setString(locCount++, TaskRunStage.RUNNING.toString()); selectStmt.setString(locCount++, TaskRunStage.LOAD_RESULTS.toString()); selectStmt.setString(locCount++, TaskRunStage.COMPLETED.toString()); resultRows = selectStmt.executeQuery(); //log.info("About to restart job...."); while (resultRows.next()) { //is_terminal == 0 should be filtered out already. Put below here just in case //boolean isTerminal = resultRows.getBoolean(6); //log.info("isTerminal = " + isTerminal); //if (!isTerminal) // continue; Long taskID = resultRows.getLong(1); if (taskIDSet.contains(taskID)) { continue; } else { taskIDSet.add(taskID); } /* * TaskRun restartRun = new TaskRun(m_dbConn, taskID, resultRows.getInt(2)).createClone(); * restartRun.setRestart(true); * restartRun.setStage(TaskRunStage.READY); */ Task task = new Task(taskID); TaskRun lastRun = task.runs().last(); //log.info(resultRows.toString()); //log.info("Task id is: " + resultRows.getInt(1) + " Run number is: " + resultRows.getInt(2) + " last Run number is: " + lastRun.getRunNumber()); if (lastRun.getRunNumber() > resultRows.getInt(2)) { continue; } Statistics stat = Statistics.find(task.getJobHandle(), lastRun.getRunNumber()); if (stat == null) { log.error("Cannot find submitter information for taskID: " + taskID + ". Skipping task: " + taskID); continue; } else { if (!stat.getSubmitter().equals(m_submitter)) { continue; } } TaskRun restartRun = lastRun.createClone(); //restartRun.setRestart(true); restartRun.setRestart(false); restartRun.setStatus(RunStatus.RESUBMITTED); restartRun.setStage(TaskRunStage.READY); task.runs().add(restartRun); task.save(); Workbench.getInstance().submitTask(task, false); log.info("Restarted a run with job id: " + taskID + " and run number = " + restartRun.getRunNumber()); } } catch ( SQLException sqlEx ) { sqlEx.printStackTrace(); return false; } catch ( IOException ioEx ) { ioEx.printStackTrace(); return false; } catch ( Exception ex ) { ex.printStackTrace(); return false; } finally { try { if (resultRows != null) { resultRows.close(); } if (selectStmt != null) { selectStmt.close(); } } catch ( SQLException sqlEx2 ) { sqlEx2.printStackTrace(); } } return true; } }