in hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Folder.java [220:499]
public int run() throws IOException {
class JobEntryComparator implements
Comparator<Pair<LoggedJob, JobTraceReader>> {
public int compare(Pair<LoggedJob, JobTraceReader> p1,
Pair<LoggedJob, JobTraceReader> p2) {
LoggedJob j1 = p1.first();
LoggedJob j2 = p2.first();
return (j1.getSubmitTime() < j2.getSubmitTime()) ? -1 : (j1
.getSubmitTime() == j2.getSubmitTime()) ? 0 : 1;
}
}
// we initialize an empty heap so if we take an error before establishing
// a real one the finally code goes through
Queue<Pair<LoggedJob, JobTraceReader>> heap =
new PriorityQueue<Pair<LoggedJob, JobTraceReader>>();
try {
LoggedJob job = reader.nextJob();
if (job == null) {
LOG.error("The job trace is empty");
return EMPTY_JOB_TRACE;
}
// If starts-after time is specified, skip the number of jobs till we reach
// the starting time limit.
if (startsAfter > 0) {
LOG.info("starts-after time is specified. Initial job submit time : "
+ job.getSubmitTime());
long approximateTime = job.getSubmitTime() + startsAfter;
job = reader.nextJob();
long skippedCount = 0;
while (job != null && job.getSubmitTime() < approximateTime) {
job = reader.nextJob();
skippedCount++;
}
LOG.debug("Considering jobs with submit time greater than "
+ startsAfter + " ms. Skipped " + skippedCount + " jobs.");
if (job == null) {
LOG.error("No more jobs to process in the trace with 'starts-after'"+
" set to " + startsAfter + "ms.");
return EMPTY_JOB_TRACE;
}
LOG.info("The first job has a submit time of " + job.getSubmitTime());
}
firstJobSubmitTime = job.getSubmitTime();
long lastJobSubmitTime = firstJobSubmitTime;
int numberJobs = 0;
long currentIntervalEnd = Long.MIN_VALUE;
Path nextSegment = null;
Outputter<LoggedJob> tempGen = null;
if (debug) {
LOG.debug("The first job has a submit time of " + firstJobSubmitTime);
}
final Configuration conf = getConf();
try {
// At the top of this loop, skewBuffer has at most
// skewBufferLength entries.
while (job != null) {
final Random tempNameGenerator = new Random();
lastJobSubmitTime = job.getSubmitTime();
++numberJobs;
if (job.getSubmitTime() >= currentIntervalEnd) {
if (tempGen != null) {
tempGen.close();
}
nextSegment = null;
for (int i = 0; i < 3 && nextSegment == null; ++i) {
try {
nextSegment =
new Path(tempDir, "segment-" + tempNameGenerator.nextLong()
+ ".json.gz");
if (debug) {
LOG.debug("The next segment name is " + nextSegment);
}
FileSystem fs = nextSegment.getFileSystem(conf);
try {
if (!fs.exists(nextSegment)) {
break;
}
continue;
} catch (IOException e) {
// no code -- file did not already exist
}
} catch (IOException e) {
// no code -- file exists now, or directory bad. We try three
// times.
}
}
if (nextSegment == null) {
throw new RuntimeException("Failed to create a new file!");
}
if (debug) {
LOG.debug("Creating " + nextSegment
+ " for a job with a submit time of " + job.getSubmitTime());
}
deletees.add(nextSegment);
tempPaths.add(nextSegment);
tempGen = new DefaultOutputter<LoggedJob>();
tempGen.init(nextSegment, conf);
long currentIntervalNumber =
(job.getSubmitTime() - firstJobSubmitTime) / inputCycle;
currentIntervalEnd =
firstJobSubmitTime + ((currentIntervalNumber + 1) * inputCycle);
}
// the temp files contain UDadjusted times, but each temp file's
// content is in the same input cycle interval.
if (tempGen != null) {
tempGen.output(job);
}
job = reader.nextJob();
}
} catch (DeskewedJobTraceReader.OutOfOrderException e) {
return OUT_OF_ORDER_JOBS;
} finally {
if (tempGen != null) {
tempGen.close();
}
}
if (lastJobSubmitTime <= firstJobSubmitTime) {
LOG.error("All of your job[s] have the same submit time."
+ " Please just use your input file.");
return ALL_JOBS_SIMULTANEOUS;
}
double submitTimeSpan = lastJobSubmitTime - firstJobSubmitTime;
LOG.warn("Your input trace spans "
+ (lastJobSubmitTime - firstJobSubmitTime) + " ticks.");
double foldingRatio =
submitTimeSpan * (numberJobs + 1) / numberJobs / inputCycle;
if (debug) {
LOG.warn("run: submitTimeSpan = " + submitTimeSpan + ", numberJobs = "
+ numberJobs + ", inputCycle = " + inputCycle);
}
if (reader.neededSkewBufferSize() > 0) {
LOG.warn("You needed a -skew-buffer-length of "
+ reader.neededSkewBufferSize() + " but no more, for this input.");
}
double tProbability = timeDilation * concentration / foldingRatio;
if (debug) {
LOG.warn("run: timeDilation = " + timeDilation + ", concentration = "
+ concentration + ", foldingRatio = " + foldingRatio);
LOG.warn("The transcription probability is " + tProbability);
}
transcriptionRateInteger = (int) Math.floor(tProbability);
transcriptionRateFraction = tProbability - Math.floor(tProbability);
// Now read all the inputs in parallel
heap =
new PriorityQueue<Pair<LoggedJob, JobTraceReader>>(tempPaths.size(),
new JobEntryComparator());
for (Path tempPath : tempPaths) {
JobTraceReader thisReader = new JobTraceReader(tempPath, conf);
closees.add(thisReader);
LoggedJob streamFirstJob = thisReader.getNext();
long thisIndex =
(streamFirstJob.getSubmitTime() - firstJobSubmitTime) / inputCycle;
if (debug) {
LOG.debug("A job with submit time of "
+ streamFirstJob.getSubmitTime() + " is in interval # "
+ thisIndex);
}
adjustJobTimes(streamFirstJob);
if (debug) {
LOG.debug("That job's submit time is adjusted to "
+ streamFirstJob.getSubmitTime());
}
heap
.add(new Pair<LoggedJob, JobTraceReader>(streamFirstJob, thisReader));
}
Pair<LoggedJob, JobTraceReader> next = heap.poll();
while (next != null) {
maybeOutput(next.first());
if (debug) {
LOG.debug("The most recent job has an adjusted submit time of "
+ next.first().getSubmitTime());
LOG.debug(" Its replacement in the heap will come from input engine "
+ next.second());
}
LoggedJob replacement = next.second().getNext();
if (replacement == null) {
next.second().close();
if (debug) {
LOG.debug("That input engine is depleted.");
}
} else {
adjustJobTimes(replacement);
if (debug) {
LOG.debug("The replacement has an adjusted submit time of "
+ replacement.getSubmitTime());
}
heap.add(new Pair<LoggedJob, JobTraceReader>(replacement, next
.second()));
}
next = heap.poll();
}
} finally {
IOUtils.cleanupWithLogger(null, reader);
if (outGen != null) {
outGen.close();
}
for (Pair<LoggedJob, JobTraceReader> heapEntry : heap) {
heapEntry.second().close();
}
for (Closeable closee : closees) {
closee.close();
}
if (!debug) {
Configuration conf = getConf();
for (Path deletee : deletees) {
FileSystem fs = deletee.getFileSystem(conf);
try {
fs.delete(deletee, false);
} catch (IOException e) {
// no code
}
}
}
}
return 0;
}