in metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java [348:409]
private boolean updateStatus() {
JobStatus tempStatus = null;
final float mrJobFraction = 0.75f; // fraction of total job progress calculation we're allocating to the MR job vs finalization
synchronized (this) {
tempStatus = new JobStatus(jobStatus);
}
boolean keepUpdating = true;
try {
boolean mrJobComplete = false;
org.apache.hadoop.mapreduce.JobStatus.State mrJobState = null;
String mrJobFailureInfo = null;
float mapProg = 0.0f;
float reduceProg = 0.0f;
synchronized (this) {
mrJobComplete = mrJob.isComplete();
org.apache.hadoop.mapreduce.JobStatus mrJobStatus = mrJob.getStatus();
mrJobState = mrJobStatus.getState();
mrJobFailureInfo = mrJobStatus.getFailureInfo();
mapProg = mrJob.mapProgress();
reduceProg = mrJob.reduceProgress();
}
if (mrJobComplete) {
switch (mrJobState) {
case SUCCEEDED:
tempStatus.withPercentComplete(100.0 * mrJobFraction).withState(State.FINALIZING).withDescription("Finalizing job.");
try {
synchronized (this) {
// want to update the description while the job is finalizing
jobStatus = new JobStatus(tempStatus);
}
setFinalResults(finalizer, configuration);
tempStatus.withPercentComplete(100.0).withState(State.SUCCEEDED).withDescription("Job completed.");
} catch (JobException je) {
tempStatus.withPercentComplete(100.0).withState(State.FAILED).withDescription("Job finalize failed.")
.withFailureException(je);
}
break;
case FAILED:
tempStatus.withPercentComplete(100.0).withState(State.FAILED).withDescription(mrJobFailureInfo);
break;
case KILLED:
tempStatus.withPercentComplete(100.0).withState(State.KILLED).withDescription(mrJobFailureInfo);
break;
}
keepUpdating = false;
} else {
float mrJobProgress = ((mapProg / 2) + (reduceProg / 2)) * 100;
float totalProgress = mrJobProgress * mrJobFraction;
String description = String
.format("map: %s%%, reduce: %s%%", mapProg * 100, reduceProg * 100);
tempStatus.withPercentComplete(totalProgress).withState(State.RUNNING)
.withDescription(description);
}
} catch (InterruptedException | IOException e) {
tempStatus.withPercentComplete(100.0).withState(State.FAILED).withFailureException(e);
keepUpdating = false;
}
synchronized (this) {
jobStatus = new JobStatus(tempStatus);
}
return keepUpdating;
}