in metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java [264:322]
public Statusable<Path> query(Optional<String> jobName,
Path basePath,
Path baseInterimResultPath,
long beginNS,
long endNS,
int numReducers,
T fields,
Configuration conf,
FileSystem fs,
PcapFilterConfigurator<T> filterImpl)
throws IOException, ClassNotFoundException, InterruptedException {
String outputDirName = outputDirFormatter.format(beginNS, endNS, filterImpl.queryToString(fields));
if(LOG.isDebugEnabled()) {
DateFormat format = SimpleDateFormat.getDateTimeInstance(SimpleDateFormat.LONG
, SimpleDateFormat.LONG
);
String from = format.format(new Date(Long.divideUnsigned(beginNS, 1000000)));
String to = format.format(new Date(Long.divideUnsigned(endNS, 1000000)));
LOG.debug("Executing query {} on timerange from {} to {}", () -> filterImpl.queryToString(fields), ()-> from, () -> to);
}
Path interimResultPath = new Path(baseInterimResultPath, outputDirName);
PcapOptions.INTERIM_RESULT_PATH.put(configuration, interimResultPath);
mrJob = createJob(jobName
, basePath
, interimResultPath
, beginNS
, endNS
, numReducers
, fields
, conf
, fs
, filterImpl
);
if (mrJob == null) {
LOG.info("No files to process with specified date range.");
try {
setFinalResults(input -> new PcapPages(), configuration);
jobStatus.withState(State.SUCCEEDED).withDescription("No results in specified date range.")
.withPercentComplete(100.0);
} catch (JobException e) {
// This should not cause an error as we simply set results to an empty result set.
jobStatus.withState(State.FAILED).withDescription("Unable to finalize empty job.")
.withFailureException(e);
}
return this;
}
synchronized (this) {
// this block synchronized for proper variable visibility across threads once the status timer
// is started. mrJob and jobStatus need to be synchronized so that their references and internal
// state are made available to the timer thread. The references to these variables above need
// not be synchronized because the job will exit when only 1 thread will have had to use them.
mrJob.submit();
jobStatus.withState(State.SUBMITTED).withDescription("Job submitted")
.withJobId(mrJob.getJobID().toString());
}
startJobStatusTimerThread(statusInterval);
return this;
}