private void initializeJobPreparedStatements()

in contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java [106:313]


  private void initializeJobPreparedStatements() throws SQLException {

    /** 
     * Job events
     */

    // JobSubmittedEvent

    PreparedStatement jobSubmittedPrepStmnt =
        connection.prepareStatement(
            "INSERT INTO " + 
                JOB_TABLE + 
                " (" +
                "jobId, " +
                "jobName, " +
                "userName, " +
                "confPath, " +
                "queue, " +
                "submitTime, " +
                "workflowId, " +
                "workflowEntityName " +
                ") " +
                "VALUES" +
                " (?, ?, ?, ?, ?, ?, ?, ?)"
            );
    entitySqlMap.put(JobSubmittedEvent.class, jobSubmittedPrepStmnt);
    
    workflowSelectPS =
        connection.prepareStatement(
            "SELECT workflowContext FROM " + WORKFLOW_TABLE + " where workflowId = ?"
            );

    workflowPS = 
        connection.prepareStatement(
            "INSERT INTO " +
                WORKFLOW_TABLE +
                " (" +
                "workflowId, " +
                "workflowName, " +
                "workflowContext, " +
                "userName, " +
                "startTime, " +
                "lastUpdateTime, " +
                "duration, " +
                "numJobsTotal, " +
                "numJobsCompleted" +
                ") " +
                "VALUES" +
                " (?, ?, ?, ?, ?, ?, 0, ?, 0)"
            );
    
    workflowUpdateTimePS =
        connection.prepareStatement(
            "UPDATE " +
                WORKFLOW_TABLE +
                " SET " +
                "workflowContext = ?, " +
                "numJobsTotal = ?, " +
                "lastUpdateTime = ?, " +
                "duration = ? - startTime " +
                "WHERE workflowId = ?"
            );
    
    workflowUpdateNumCompletedPS =
        connection.prepareStatement(
            "UPDATE " +
                WORKFLOW_TABLE +
                " SET " +
                "lastUpdateTime = ?, " +
                "duration = ? - startTime, " +
                "numJobsCompleted = (" +
                  "SELECT count(*)" +
                  " FROM " +
                  JOB_TABLE +
                  " WHERE " +
                  "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
                  " AND status = 'SUCCESS'), " +
                "inputBytes = (" +
                  "SELECT sum(inputBytes)" +
                  " FROM " +
                  JOB_TABLE +
                  " WHERE " +
                  "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
                  " AND status = 'SUCCESS'), " +
                "outputBytes = (" +
                  "SELECT sum(outputBytes)" +
                  " FROM " +
                  JOB_TABLE +
                  " WHERE " +
                  "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
                  " AND status = 'SUCCESS') " +
                " WHERE workflowId = (SELECT workflowId FROM " +
                JOB_TABLE +
                " WHERE jobId = ?)"
            );
    
    // JobFinishedEvent

    PreparedStatement jobFinishedPrepStmnt = 
        connection.prepareStatement(
            "UPDATE " +
                JOB_TABLE +
                " SET " +
                "finishTime = ?, " +
                "finishedMaps = ?, " +
                "finishedReduces= ?, " +
                "failedMaps = ?, " +
                "failedReduces = ?, " +
                "inputBytes = ?, " +
                "outputBytes = ? " +
                "WHERE " +
                "jobId = ?" 
            );
    entitySqlMap.put(JobFinishedEvent.class, jobFinishedPrepStmnt);

    // JobInitedEvent
    
    PreparedStatement jobInitedPrepStmnt = 
        connection.prepareStatement(
            "UPDATE " +
                JOB_TABLE +
                " SET " +
                "launchTime = ?, " +
                "maps = ?, " +
                "reduces = ?, " +
                "status = ? "+
                "WHERE " +
                "jobId = ?" 
            );
    entitySqlMap.put(JobInitedEvent.class, jobInitedPrepStmnt);

    // JobStatusChangedEvent
    
    PreparedStatement jobStatusChangedPrepStmnt = 
        connection.prepareStatement(
            "UPDATE " +
                JOB_TABLE +
                " SET " +
                "status = ? "+
                "WHERE " +
                "jobId = ?" 
            );
    entitySqlMap.put(JobStatusChangedEvent.class, jobStatusChangedPrepStmnt);

    // JobInfoChangedEvent
    
    PreparedStatement jobInfoChangedPrepStmnt = 
        connection.prepareStatement(
            "UPDATE " +
                JOB_TABLE +
                " SET " +
                "submitTime = ?, " +
                "launchTime = ? " +
                "WHERE " +
                "jobId = ?" 
            );
    entitySqlMap.put(JobInfoChangeEvent.class, jobInfoChangedPrepStmnt);

    // JobUnsuccessfulCompletionEvent
    PreparedStatement jobUnsuccessfulPrepStmnt = 
        connection.prepareStatement(
            "UPDATE " +
                JOB_TABLE +
                " SET " +
                "finishTime = ?, " +
                "finishedMaps = ?, " +
                "finishedReduces = ?, " +
                "status = ? " +
                "WHERE " +
                "jobId = ?" 
            );
    entitySqlMap.put(
        JobUnsuccessfulCompletionEvent.class, jobUnsuccessfulPrepStmnt);

    // Job update at the end
    jobEndUpdate =
        connection.prepareStatement(
            "UPDATE " +
              JOB_TABLE +
              " SET " +
              " mapsRuntime = (" +
                "SELECT " +
                "SUM(" + 
                      TASKATTEMPT_TABLE + ".finishTime" +  " - " + 
                      TASKATTEMPT_TABLE + ".startTime" +
                		  ")" +
                " FROM " +
                TASKATTEMPT_TABLE + 
                " WHERE " +
                TASKATTEMPT_TABLE + ".jobId = " + JOB_TABLE + ".jobId " +
                " AND " +
                TASKATTEMPT_TABLE + ".taskType = ?)" +
              ", " +
              " reducesRuntime = (" +
                "SELECT SUM(" + 
                             TASKATTEMPT_TABLE + ".finishTime" +  " - " + 
                             TASKATTEMPT_TABLE + ".startTime" +
                		        ")" +
              	" FROM " +
                TASKATTEMPT_TABLE + 
                " WHERE " +
                TASKATTEMPT_TABLE + ".jobId = " + JOB_TABLE + ".jobId " +
                " AND " +
                TASKATTEMPT_TABLE + ".taskType = ?) " +
              " WHERE " +
                 "jobId = ?"
            );
  }