private void processJobLine()

in hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java [746:956]


  private void processJobLine(ParsedLine line) throws JsonProcessingException,
      IOException {
    try {
      if (version == 0 || version == 1) {
        // determine the job type if this is the declaration line
        String jobID = line.get("JOBID");

        String user = line.get("USER");

        String jobPriority = line.get("JOB_PRIORITY");

        String submitTime = line.get("SUBMIT_TIME");

        String jobName = line.get("JOBNAME");

        String launchTime = line.get("LAUNCH_TIME");

        String finishTime = line.get("FINISH_TIME");

        String status = line.get("JOB_STATUS");

        String totalMaps = line.get("TOTAL_MAPS");

        String totalReduces = line.get("TOTAL_REDUCES");

        /*
         * If the job appears new [the ID is different from the most recent one,
         * if any] we make a new LoggedJob.
         */
        if (jobID != null
            && jobTraceGen != null
            && (jobBeingTraced == null 
                || !jobID.equals(jobBeingTraced.getJobID().toString()))) {
          // push out the old job if there is one, even though it did't get
          // mated
          // with a conf.

          finalizeJob();

          jobBeingTraced = new LoggedJob(jobID);

          tasksInCurrentJob = new HashMap<String, LoggedTask>();
          attemptsInCurrentJob = new HashMap<String, LoggedTaskAttempt>();

          // initialize all the per-job statistics gathering places
          successfulMapAttemptTimes =
              new Histogram[ParsedHost.numberOfDistances() + 1];
          for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
            successfulMapAttemptTimes[i] = new Histogram();
          }

          successfulReduceAttemptTimes = new Histogram();
          failedMapAttemptTimes =
              new Histogram[ParsedHost.numberOfDistances() + 1];
          for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
            failedMapAttemptTimes[i] = new Histogram();
          }

          failedReduceAttemptTimes = new Histogram();
          successfulNthMapperAttempts = new Histogram();
          successfulNthReducerAttempts = new Histogram();
          mapperLocality = new Histogram();
        }

        // here we fill in all the stuff the trace might need
        if (jobBeingTraced != null) {
          if (user != null) {
            jobBeingTraced.setUser(user);
          }

          if (jobPriority != null) {
            jobBeingTraced.setPriority(LoggedJob.JobPriority
                .valueOf(jobPriority));
          }

          if (totalMaps != null) {
            jobBeingTraced.setTotalMaps(Integer.parseInt(totalMaps));
          }

          if (totalReduces != null) {
            jobBeingTraced.setTotalReduces(Integer.parseInt(totalReduces));
          }

          if (submitTime != null) {
            jobBeingTraced.setSubmitTime(Long.parseLong(submitTime));
          }

          if (launchTime != null) {
            jobBeingTraced.setLaunchTime(Long.parseLong(launchTime));
          }

          if (finishTime != null) {
            jobBeingTraced.setFinishTime(Long.parseLong(finishTime));
            if (status != null) {
              jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values
                  .valueOf(status));
            }

            maybeMateJobAndConf();
          }
        }

        if (jobName != null) {
          // we'll make it java unless the name parses out
          Matcher m = streamingJobnamePattern.matcher(jobName);

          thisJobType = LoggedJob.JobType.JAVA;

          if (m.matches()) {
            thisJobType = LoggedJob.JobType.STREAMING;
          }
        }
        if (submitTime != null) {
          submitTimeCurrentJob = Long.parseLong(submitTime);

          currentJobID = jobID;

          taskAttemptStartTimes = new HashMap<String, Long>();
          taskReduceAttemptShuffleEndTimes = new HashMap<String, Long>();
          taskReduceAttemptSortEndTimes = new HashMap<String, Long>();
          taskMapAttemptFinishTimes = new HashMap<String, Long>();
          taskReduceAttemptFinishTimes = new HashMap<String, Long>();

          launchTimeCurrentJob = 0L;
        } else if (launchTime != null && jobID != null
            && currentJobID.equals(jobID)) {
          launchTimeCurrentJob = Long.parseLong(launchTime);
        } else if (finishTime != null && jobID != null
            && currentJobID.equals(jobID)) {
          long endTime = Long.parseLong(finishTime);

          if (launchTimeCurrentJob != 0) {
            String jobResultText = line.get("JOB_STATUS");

            JobOutcome thisOutcome =
                ((jobResultText != null && "SUCCESS".equals(jobResultText))
                    ? JobOutcome.SUCCESS : JobOutcome.FAILURE);

            if (submitTimeCurrentJob != 0L) {
              canonicalDistributionsEnter(delayTimeDists, thisOutcome,
                  thisJobType, launchTimeCurrentJob - submitTimeCurrentJob);
            }

            if (launchTimeCurrentJob != 0L) {
              canonicalDistributionsEnter(runTimeDists, thisOutcome,
                  thisJobType, endTime - launchTimeCurrentJob);
            }

            // Now we process the hash tables with successful task attempts

            Histogram currentJobMapTimes = new Histogram();
            Histogram currentJobShuffleTimes = new Histogram();
            Histogram currentJobSortTimes = new Histogram();
            Histogram currentJobReduceTimes = new Histogram();

            Iterator<Map.Entry<String, Long>> taskIter =
                taskAttemptStartTimes.entrySet().iterator();

            while (taskIter.hasNext()) {
              Map.Entry<String, Long> entry = taskIter.next();

              long startTime = entry.getValue();

              // Map processing
              Long mapEndTime = taskMapAttemptFinishTimes.get(entry.getKey());

              if (mapEndTime != null) {
                currentJobMapTimes.enter(mapEndTime - startTime);

                canonicalDistributionsEnter(mapTimeDists, thisOutcome,
                    thisJobType, mapEndTime - startTime);
              }

              // Reduce processing
              Long shuffleEnd =
                  taskReduceAttemptShuffleEndTimes.get(entry.getKey());
              Long sortEnd = taskReduceAttemptSortEndTimes.get(entry.getKey());
              Long reduceEnd = taskReduceAttemptFinishTimes.get(entry.getKey());

              if (shuffleEnd != null && sortEnd != null && reduceEnd != null) {
                currentJobShuffleTimes.enter(shuffleEnd - startTime);
                currentJobSortTimes.enter(sortEnd - shuffleEnd);
                currentJobReduceTimes.enter(reduceEnd - sortEnd);

                canonicalDistributionsEnter(shuffleTimeDists, thisOutcome,
                    thisJobType, shuffleEnd - startTime);
                canonicalDistributionsEnter(sortTimeDists, thisOutcome,
                    thisJobType, sortEnd - shuffleEnd);
                canonicalDistributionsEnter(reduceTimeDists, thisOutcome,
                    thisJobType, reduceEnd - sortEnd);
              }
            }

            // Here we save out the task information
            incorporateSpread(currentJobMapTimes, mapTimeSpreadDists,
                thisOutcome, thisJobType);
            incorporateSpread(currentJobShuffleTimes, shuffleTimeSpreadDists,
                thisOutcome, thisJobType);
            incorporateSpread(currentJobSortTimes, sortTimeSpreadDists,
                thisOutcome, thisJobType);
            incorporateSpread(currentJobReduceTimes, reduceTimeSpreadDists,
                thisOutcome, thisJobType);
          }
        }
      }
    } catch (NumberFormatException e) {
      LOG.warn(
          "HadoopLogsAnalyzer.processJobLine: bad numerical format, at line "
              + lineNumber + ".", e);
    }
  }