package org.ngbw.utils; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import org.ngbw.sdk.database.ConnectionManager; import org.ngbw.sdk.database.DriverConnectionSource; /** * * @author Paul Hoover * */ public class AddTaskRuns { // data fields private static final int FETCH_SIZE = 100; private static final int MAX_BATCH_SIZE = 100; // public methods public static void main ( String[] args ) { try { ConnectionManager.setConnectionSource(new DriverConnectionSource()); Connection dbConn = ConnectionManager.getConnectionSource().getConnection(); try { dbConn.setAutoCommit(false); createTaskRunsTable(dbConn); populateTaskRuns(dbConn); alterTasksTable(dbConn); alterTaskInputsTable(dbConn); alterTaskOutputsTable(dbConn); alterTaskLogsTable(dbConn); alterRunningTasksTable(dbConn); alterRunningTasksParametersTable(dbConn); alterJobEventsTable(dbConn); alterJobStatsTable(dbConn); System.out.println("finished"); } finally { dbConn.close(); } } catch ( Exception err ) { err.printStackTrace(System.err); System.exit(1); } } // private methods private static void createTaskRunsTable ( Connection dbConn ) throws SQLException { Statement createStmt = dbConn.createStatement(); try { System.out.println("creating task_runs table..."); createStmt.executeUpdate( "CREATE TABLE task_runs (" + "TASK_ID bigint NOT NULL, " + "RUN_NUMBER integer NOT NULL, " + "CREATION_DATE datetime NOT NULL, " + "STATUS integer DEFAULT 1 NOT NULL, " + "STAGE varchar(20) NOT NULL, " + "IS_TERMINAL tinyint(1) DEFAULT 0 NOT NULL, " + "IS_RESTART tinyint(1) DEFAULT 0 NOT NULL, " + "PRIMARY KEY (TASK_ID, RUN_NUMBER), " + "FOREIGN KEY (TASK_ID) REFERENCES tasks (TASK_ID) " + ")" ); dbConn.commit(); } catch ( SQLException sqlErr ) { dbConn.rollback(); throw sqlErr; } finally { createStmt.close(); } } private static void populateTaskRuns ( Connection selectConn ) throws SQLException { Connection insertConn = ConnectionManager.getConnectionSource().getConnection(); PreparedStatement insertStmt = null; Statement selectStmt = null; ResultSet rows = null; try { System.out.println("writing task_runs records..."); insertConn.setAutoCommit(false); insertStmt = insertConn.prepareStatement( "INSERT INTO task_runs (TASK_ID, RUN_NUMBER, CREATION_DATE, STATUS, STAGE, IS_TERMINAL) VALUES (?, 1, ?, ?, ?, ?)" ); selectStmt = selectConn.createStatement(); selectStmt.setFetchSize(FETCH_SIZE); rows = selectStmt.executeQuery( "SELECT TASK_ID, CREATION_DATE, OK, STAGE, IS_TERMINAL " + "FROM tasks " ); int batchSize = 0; while (rows.next()) { long taskId = rows.getLong(1); Timestamp creationDate = rows.getTimestamp(2); int ok = rows.getInt(3); String stage = rows.getString(4); boolean isTerminal = rows.getBoolean(5); insertStmt.setLong(1, taskId); insertStmt.setTimestamp(2, creationDate); insertStmt.setInt(3, ok); insertStmt.setString(4, stage); insertStmt.setBoolean(5, isTerminal); insertStmt.addBatch(); batchSize += 1; if (batchSize >= MAX_BATCH_SIZE) { insertStmt.executeBatch(); insertConn.commit(); batchSize = 0; } } if (batchSize > 0) { insertStmt.executeBatch(); insertConn.commit(); } } finally { if (rows != null) { rows.close(); } if (insertStmt != null) { insertStmt.close(); } if (selectStmt != null) { selectStmt.close(); } insertConn.close(); } } private static void alterTasksTable ( Connection dbConn ) throws SQLException { Statement updateStmt = dbConn.createStatement(); try { System.out.println("altering tasks table..."); updateStmt.executeUpdate( "ALTER TABLE tasks " + "DROP COLUMN OK, " + "DROP COLUMN STAGE, " + "DROP COLUMN IS_TERMINAL" ); dbConn.commit(); } catch ( SQLException sqlErr ) { dbConn.rollback(); throw sqlErr; } finally { updateStmt.close(); } } private static void alterTaskInputsTable ( Connection dbConn ) throws SQLException { Statement stmt = dbConn.createStatement(); ResultSet row = null; try { System.out.println("altering task_input_parameters table..."); StringBuilder queryBuilder = new StringBuilder(); queryBuilder.append( "ALTER TABLE task_input_parameters " + "ADD COLUMN RUN_NUMBER integer NOT NULL DEFAULT 1, " ); row = stmt.executeQuery( "SELECT CONSTRAINT_NAME " + "FROM information_schema.REFERENTIAL_CONSTRAINTS " + "WHERE CONSTRAINT_SCHEMA = 'cipres' " + "AND TABLE_NAME = 'task_input_parameters' " + "AND REFERENCED_TABLE_NAME = 'tasks'" ); if (row.next()) { String keyName = row.getString(1); queryBuilder.append("DROP FOREIGN KEY "); queryBuilder.append(keyName); queryBuilder.append(", "); } queryBuilder.append("ADD FOREIGN KEY (TASK_ID, RUN_NUMBER) REFERENCES task_runs (TASK_ID, RUN_NUMBER)"); stmt.executeUpdate(queryBuilder.toString()); stmt.executeUpdate( "ALTER TABLE task_input_parameters " + "MODIFY COLUMN RUN_NUMBER integer NOT NULL" ); dbConn.commit(); } finally { if (row != null) { row.close(); } stmt.close(); } } private static void alterTaskOutputsTable ( Connection dbConn ) throws SQLException { Statement stmt = dbConn.createStatement(); ResultSet row = null; try { System.out.println("altering task_output_parameters table..."); StringBuilder queryBuilder = new StringBuilder(); queryBuilder.append( "ALTER TABLE task_output_parameters " + "ADD COLUMN RUN_NUMBER integer NOT NULL DEFAULT 1, " ); row = stmt.executeQuery( "SELECT CONSTRAINT_NAME " + "FROM information_schema.REFERENTIAL_CONSTRAINTS " + "WHERE CONSTRAINT_SCHEMA = 'cipres' " + "AND TABLE_NAME = 'task_output_parameters' " + "AND REFERENCED_TABLE_NAME = 'tasks'" ); if (row.next()) { String keyName = row.getString(1); queryBuilder.append("DROP FOREIGN KEY "); queryBuilder.append(keyName); queryBuilder.append(", "); } queryBuilder.append("ADD FOREIGN KEY (TASK_ID, RUN_NUMBER) REFERENCES task_runs (TASK_ID, RUN_NUMBER)"); stmt.executeUpdate(queryBuilder.toString()); stmt.executeUpdate( "ALTER TABLE task_output_parameters " + "MODIFY COLUMN RUN_NUMBER integer NOT NULL" ); dbConn.commit(); } finally { if (row != null) { row.close(); } stmt.close(); } } private static void alterTaskLogsTable ( Connection dbConn ) throws SQLException { Statement stmt = dbConn.createStatement(); ResultSet row = null; try { System.out.println("altering task_log_messages table..."); StringBuilder queryBuilder = new StringBuilder(); queryBuilder.append( "ALTER TABLE task_log_messages " + "ADD COLUMN RUN_NUMBER integer NOT NULL DEFAULT 1, " + "DROP PRIMARY KEY, " + "ADD PRIMARY KEY (TASK_ID, RUN_NUMBER, MESSAGE_INDEX), " ); row = stmt.executeQuery( "SELECT CONSTRAINT_NAME " + "FROM information_schema.REFERENTIAL_CONSTRAINTS " + "WHERE CONSTRAINT_SCHEMA = 'cipres' " + "AND TABLE_NAME = 'task_log_messages' " + "AND REFERENCED_TABLE_NAME = 'tasks'" ); if (row.next()) { String keyName = row.getString(1); queryBuilder.append("DROP FOREIGN KEY "); queryBuilder.append(keyName); queryBuilder.append(", "); } queryBuilder.append("ADD FOREIGN KEY (TASK_ID, RUN_NUMBER) REFERENCES task_runs (TASK_ID, RUN_NUMBER)"); stmt.executeUpdate(queryBuilder.toString()); stmt.executeUpdate( "ALTER TABLE task_log_messages " + "MODIFY COLUMN RUN_NUMBER integer NOT NULL" ); dbConn.commit(); } finally { if (row != null) { row.close(); } stmt.close(); } } private static void alterRunningTasksTable ( Connection dbConn ) throws SQLException { Statement stmt = dbConn.createStatement(); try { System.out.println("altering running_tasks table..."); stmt.executeUpdate( "ALTER TABLE running_tasks " + "ADD COLUMN RUN_NUMBER integer NOT NULL DEFAULT 1, " + "DROP PRIMARY KEY, " + "ADD PRIMARY KEY (JOBHANDLE, RUN_NUMBER)" ); stmt.executeUpdate( "ALTER TABLE running_tasks " + "MODIFY COLUMN RUN_NUMBER integer NOT NULL" ); dbConn.commit(); } finally { stmt.close(); } } private static void alterRunningTasksParametersTable ( Connection dbConn ) throws SQLException { Statement stmt = dbConn.createStatement(); ResultSet row = null; try { System.out.println("altering running_tasks_parameters table..."); StringBuilder queryBuilder = new StringBuilder(); queryBuilder.append( "ALTER TABLE running_tasks_parameters " + "ADD COLUMN RUN_NUMBER integer NOT NULL DEFAULT 1, " + "DROP PRIMARY KEY, " + "ADD PRIMARY KEY (JOBHANDLE, RUN_NUMBER, NAME), " ); row = stmt.executeQuery( "SELECT CONSTRAINT_NAME " + "FROM information_schema.REFERENTIAL_CONSTRAINTS " + "WHERE CONSTRAINT_SCHEMA = 'cipres' " + "AND TABLE_NAME = 'running_tasks_parameters' " + "AND REFERENCED_TABLE_NAME = 'running_tasks'" ); if (row.next()) { String keyName = row.getString(1); queryBuilder.append("DROP FOREIGN KEY "); queryBuilder.append(keyName); queryBuilder.append(", "); } queryBuilder.append("ADD FOREIGN KEY (JOBHANDLE, RUN_NUMBER) REFERENCES running_tasks (JOBHANDLE, RUN_NUMBER)"); stmt.executeUpdate(queryBuilder.toString()); stmt.executeUpdate( "ALTER TABLE running_tasks_parameters " + "MODIFY COLUMN RUN_NUMBER integer NOT NULL" ); dbConn.commit(); } finally { if (row != null) { row.close(); } stmt.close(); } } private static void alterJobEventsTable ( Connection dbConn ) throws SQLException { Statement stmt = dbConn.createStatement(); try { System.out.println("altering job_events table..."); stmt.executeUpdate( "ALTER TABLE job_events " + "ADD COLUMN RUN_NUMBER integer NOT NULL DEFAULT 1" ); stmt.executeUpdate( "ALTER TABLE job_events " + "MODIFY COLUMN RUN_NUMBER integer NOT NULL" ); stmt.executeUpdate( "DROP INDEX JOBHANDLE ON job_events" ); stmt.executeUpdate( "CREATE INDEX JOBHANDLE ON job_events (JOBHANDLE, RUN_NUMBER)" ); dbConn.commit(); } finally { stmt.close(); } } private static void alterJobStatsTable ( Connection dbConn ) throws SQLException { Statement stmt = dbConn.createStatement(); try { System.out.println("altering job_stats table..."); stmt.executeUpdate( "ALTER TABLE job_stats " + "ADD COLUMN RUN_NUMBER integer NOT NULL DEFAULT 1, " + "DROP PRIMARY KEY, " + "ADD PRIMARY KEY (JOBHANDLE, RUN_NUMBER)" ); stmt.executeUpdate( "ALTER TABLE job_stats " + "MODIFY COLUMN RUN_NUMBER integer NOT NULL" ); dbConn.commit(); } finally { stmt.close(); } } }