in contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java [106:313]
private void initializeJobPreparedStatements() throws SQLException {
/**
* Job events
*/
// JobSubmittedEvent
PreparedStatement jobSubmittedPrepStmnt =
connection.prepareStatement(
"INSERT INTO " +
JOB_TABLE +
" (" +
"jobId, " +
"jobName, " +
"userName, " +
"confPath, " +
"queue, " +
"submitTime, " +
"workflowId, " +
"workflowEntityName " +
") " +
"VALUES" +
" (?, ?, ?, ?, ?, ?, ?, ?)"
);
entitySqlMap.put(JobSubmittedEvent.class, jobSubmittedPrepStmnt);
workflowSelectPS =
connection.prepareStatement(
"SELECT workflowContext FROM " + WORKFLOW_TABLE + " where workflowId = ?"
);
workflowPS =
connection.prepareStatement(
"INSERT INTO " +
WORKFLOW_TABLE +
" (" +
"workflowId, " +
"workflowName, " +
"workflowContext, " +
"userName, " +
"startTime, " +
"lastUpdateTime, " +
"duration, " +
"numJobsTotal, " +
"numJobsCompleted" +
") " +
"VALUES" +
" (?, ?, ?, ?, ?, ?, 0, ?, 0)"
);
workflowUpdateTimePS =
connection.prepareStatement(
"UPDATE " +
WORKFLOW_TABLE +
" SET " +
"workflowContext = ?, " +
"numJobsTotal = ?, " +
"lastUpdateTime = ?, " +
"duration = ? - startTime " +
"WHERE workflowId = ?"
);
workflowUpdateNumCompletedPS =
connection.prepareStatement(
"UPDATE " +
WORKFLOW_TABLE +
" SET " +
"lastUpdateTime = ?, " +
"duration = ? - startTime, " +
"numJobsCompleted = (" +
"SELECT count(*)" +
" FROM " +
JOB_TABLE +
" WHERE " +
"workflowId = " + WORKFLOW_TABLE + ".workflowId" +
" AND status = 'SUCCESS'), " +
"inputBytes = (" +
"SELECT sum(inputBytes)" +
" FROM " +
JOB_TABLE +
" WHERE " +
"workflowId = " + WORKFLOW_TABLE + ".workflowId" +
" AND status = 'SUCCESS'), " +
"outputBytes = (" +
"SELECT sum(outputBytes)" +
" FROM " +
JOB_TABLE +
" WHERE " +
"workflowId = " + WORKFLOW_TABLE + ".workflowId" +
" AND status = 'SUCCESS') " +
" WHERE workflowId = (SELECT workflowId FROM " +
JOB_TABLE +
" WHERE jobId = ?)"
);
// JobFinishedEvent
PreparedStatement jobFinishedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
"finishTime = ?, " +
"finishedMaps = ?, " +
"finishedReduces= ?, " +
"failedMaps = ?, " +
"failedReduces = ?, " +
"inputBytes = ?, " +
"outputBytes = ? " +
"WHERE " +
"jobId = ?"
);
entitySqlMap.put(JobFinishedEvent.class, jobFinishedPrepStmnt);
// JobInitedEvent
PreparedStatement jobInitedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
"launchTime = ?, " +
"maps = ?, " +
"reduces = ?, " +
"status = ? "+
"WHERE " +
"jobId = ?"
);
entitySqlMap.put(JobInitedEvent.class, jobInitedPrepStmnt);
// JobStatusChangedEvent
PreparedStatement jobStatusChangedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
"status = ? "+
"WHERE " +
"jobId = ?"
);
entitySqlMap.put(JobStatusChangedEvent.class, jobStatusChangedPrepStmnt);
// JobInfoChangedEvent
PreparedStatement jobInfoChangedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
"submitTime = ?, " +
"launchTime = ? " +
"WHERE " +
"jobId = ?"
);
entitySqlMap.put(JobInfoChangeEvent.class, jobInfoChangedPrepStmnt);
// JobUnsuccessfulCompletionEvent
PreparedStatement jobUnsuccessfulPrepStmnt =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
"finishTime = ?, " +
"finishedMaps = ?, " +
"finishedReduces = ?, " +
"status = ? " +
"WHERE " +
"jobId = ?"
);
entitySqlMap.put(
JobUnsuccessfulCompletionEvent.class, jobUnsuccessfulPrepStmnt);
// Job update at the end
jobEndUpdate =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
" mapsRuntime = (" +
"SELECT " +
"SUM(" +
TASKATTEMPT_TABLE + ".finishTime" + " - " +
TASKATTEMPT_TABLE + ".startTime" +
")" +
" FROM " +
TASKATTEMPT_TABLE +
" WHERE " +
TASKATTEMPT_TABLE + ".jobId = " + JOB_TABLE + ".jobId " +
" AND " +
TASKATTEMPT_TABLE + ".taskType = ?)" +
", " +
" reducesRuntime = (" +
"SELECT SUM(" +
TASKATTEMPT_TABLE + ".finishTime" + " - " +
TASKATTEMPT_TABLE + ".startTime" +
")" +
" FROM " +
TASKATTEMPT_TABLE +
" WHERE " +
TASKATTEMPT_TABLE + ".jobId = " + JOB_TABLE + ".jobId " +
" AND " +
TASKATTEMPT_TABLE + ".taskType = ?) " +
" WHERE " +
"jobId = ?"
);
}