/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package org.ngbw.utils; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Set; import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.ngbw.sdk.Workbench; import org.ngbw.sdk.core.shared.TaskRunStage; import org.ngbw.sdk.database.ConnectionManager; import org.ngbw.sdk.database.RunStatus; import org.ngbw.sdk.database.Task; import org.ngbw.sdk.database.TaskRun; /** * * @author mzhuang */ public class RestartRun { private static final Log log = LogFactory.getLog(RestartRun.class.getName()); private static Connection m_dbConn = null; private int m_maxRestart = 0; private int m_minMinutes = 0; private int m_maxHours = 0; static { try { m_dbConn = ConnectionManager.getConnectionSource().getConnection(); } catch (SQLException sqlEx) { sqlEx.printStackTrace(); } } public void setMaxRestart(int maxRestart) { m_maxRestart = maxRestart; } public void setMinMinutes(int minMinutes) { m_minMinutes = minMinutes; } public void setMaxHours(int maxHours) { m_maxHours = maxHours; } public boolean restartRuns() { if (!((m_maxRestart > 0 || m_maxHours >0) && m_minMinutes > 0)) { log.error("m_maxRestart = " + m_maxRestart + " m_maxHours = " + m_maxHours + " m_minMinutes = " + m_minMinutes); log.error("minMinutes must be bigger than zero and one of maxRestart/maxHours must be bigger than zero. Since these criteria are not met, will not execute restart runs"); return false; } /* String stmt = "SELECT TASK_ID, MAX(RUN_NUMBER) MAX_RUN_NUMBER, OK, CREATION_DATE, STAGE, IS_TERMINAL FROM task_runs GROUP BY TASK_ID HAVING " + "OK = 0 AND MAX_RUN_NUMBER > 0 AND MAX_RUN_NUMBER < ? AND TIMESTAMPDIFF(MINUTE, CREATION_DATE, NOW()) >= ? " + "AND TIMESTAMPDIFF(HOUR, CREATION_DATE, NOW()) < ? " + "AND STAGE<>? AND STAGE <> ? AND STAGE <> ? AND STAGE <> ? AND STAGE <> ? " + "AND IS_TERMINAL = 1"; */ String stmt = "SELECT TASK_ID, RUN_NUMBER, STATUS, CREATION_DATE, STAGE, IS_TERMINAL FROM task_runs t1 WHERE " + "STATUS in (?, ?) AND RUN_NUMBER > 0 " + (m_maxRestart > 0? "AND RUN_NUMBER < ? " : " ") + " AND TIMESTAMPDIFF(MINUTE, CREATION_DATE, NOW()) >= ? " + (m_maxHours >0? "AND TIMESTAMPDIFF(HOUR, CREATION_DATE, NOW()) < ? " : " ") + "AND STAGE<>? AND STAGE <> ? AND STAGE <> ? AND STAGE <> ? AND STAGE <> ? " + "AND IS_TERMINAL = 1 " + "AND RUN_NUMBER in (SELECT MAX(RUN_NUMBER) FROM task_runs t2 WHERE t1.TASK_ID = t2.TASK_ID)"; PreparedStatement selectStmt = null; ResultSet resultRows = null; Set taskIDSet = new TreeSet(); try { selectStmt = m_dbConn.prepareStatement(stmt); int locCount = 1; selectStmt.setInt(locCount++, RunStatus.TEMPORARY_COMMU.value()); selectStmt.setInt(locCount++, RunStatus.TEMPORARY_PROBLEM.value()); if (m_maxRestart > 0) selectStmt.setInt(locCount++, m_maxRestart); selectStmt.setInt(locCount++, m_minMinutes); if (m_maxHours > 0) selectStmt.setInt(locCount++, m_maxHours); selectStmt.setString(locCount++, TaskRunStage.READY.toString()); selectStmt.setString(locCount++, TaskRunStage.SUBMITTED.toString()); selectStmt.setString(locCount++, TaskRunStage.RUNNING.toString()); selectStmt.setString(locCount++, TaskRunStage.LOAD_RESULTS.toString()); selectStmt.setString(locCount++, TaskRunStage.COMPLETED.toString()); resultRows = selectStmt.executeQuery(); log.info("About to restart job...."); while (resultRows.next()) { //is_terminal == 0 should be filtered out already. Put below here just in case //boolean isTerminal = resultRows.getBoolean(6); //log.info("isTerminal = " + isTerminal); //if (!isTerminal) // continue; Long taskID = resultRows.getLong(1); if (taskIDSet.contains(taskID)) continue; else taskIDSet.add(taskID); /* TaskRun restartRun = new TaskRun(m_dbConn, taskID, resultRows.getInt(2)).createClone(); restartRun.setRestart(true); restartRun.setStage(TaskRunStage.READY); */ Task task = new Task(taskID); TaskRun lastRun = task.runs().last(); log.info(resultRows.toString()); log.info("Task id is: " + resultRows.getInt(1) + " Run number is: " + resultRows.getInt(2) + " last Run number is: " + lastRun.getRunNumber()); if (lastRun.getRunNumber()> resultRows.getInt(2)) continue; TaskRun restartRun = lastRun.createClone(); //restartRun.setRestart(true); restartRun.setRestart(false); restartRun.setStatus(RunStatus.RESUBMITTED); restartRun.setStage(TaskRunStage.READY); task.runs().add(restartRun); task.save(); Workbench.getInstance().submitTask(task, false); log.info("Added a run with job id: " + taskID); } } catch (SQLException sqlEx) { sqlEx.printStackTrace(); return false; } catch (IOException ioEx) { ioEx.printStackTrace(); return false; } catch (Exception ex) { ex.printStackTrace(); return false; } finally { try { if (resultRows != null) resultRows.close(); if (selectStmt != null) selectStmt.close(); } catch (SQLException sqlEx2) { sqlEx2.printStackTrace(); } } return true; } }