in datafu-hourglass/src/main/java/datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.java [380:568]
private void execute() throws IOException, InterruptedException, ClassNotFoundException
{
int iterations = 0;
while (true)
{
PartitionCollapsingExecutionPlanner planner = new PartitionCollapsingExecutionPlanner(getFileSystem(),getProperties());
planner.setInputPaths(getInputPaths());
planner.setOutputPath(getOutputPath());
planner.setStartDate(getStartDate());
planner.setEndDate(getEndDate());
planner.setDaysAgo(getDaysAgo());
planner.setNumDays(getNumDays());
planner.setMaxToProcess(getMaxToProcess());
planner.setReusePreviousOutput(getReusePreviousOutput());
planner.setFailOnMissing(isFailOnMissing());
planner.createPlan();
if (planner.getInputsToProcess().size() == 0)
{
_log.info("Nothing to do");
break;
}
if (iterations >= getMaxIterations())
{
throw new RuntimeException(String.format("Already completed %d iterations but the max is %d and there are still %d inputs to process",
iterations,
getMaxIterations(),
planner.getInputsToProcess().size()));
}
Report report = new Report();
report.inputFiles.addAll(planner.getNewInputsToProcess());
report.oldInputFiles.addAll(planner.getOldInputsToProcess());
if (planner.getPreviousOutputToProcess() != null)
{
report.reusedOutput = planner.getPreviousOutputToProcess();
}
DatePath outputPath = DatePath.createDatedPath(getOutputPath(), planner.getCurrentDateRange().getEndDate());
_log.info("Output path: " + outputPath);
Path tempOutputPath = createRandomTempPath();
_garbage.add(tempOutputPath);
final StagedOutputJob job = StagedOutputJob.createStagedJob(
getConf(),
getName() + "-" + PathUtils.datedPathFormat.format(planner.getCurrentDateRange().getEndDate()),
null, // no input paths specified here, will add multiple inputs down below
tempOutputPath.toString(),
outputPath.getPath().toString(),
_log);
job.setCountersParentPath(getCountersParentPath());
if (planner.getNewInputsToProcess() != null && planner.getNewInputsToProcess().size() > 0)
{
_log.info("*** New Input data:");
for (DatePath inputPath : planner.getNewInputsToProcess())
{
_log.info(inputPath.getPath());
MultipleInputs.addInputPath(job, inputPath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
}
}
if (planner.getOldInputsToProcess() != null && planner.getOldInputsToProcess().size() > 0)
{
_log.info("*** Old Input data:");
for (DatePath inputPath : planner.getOldInputsToProcess())
{
_log.info(inputPath.getPath());
MultipleInputs.addInputPath(job, inputPath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
}
}
if (planner.getPreviousOutputToProcess() != null)
{
_log.info("*** Previous output data:");
_log.info(planner.getPreviousOutputToProcess().getPath());
MultipleInputs.addInputPath(job, planner.getPreviousOutputToProcess().getPath(), AvroKeyInputFormat.class, AvroKeyValueIdentityMapper.class);
}
final Configuration conf = job.getConfiguration();
config(conf);
AvroDateRangeMetadata.configureOutputDateRange(conf, planner.getCurrentDateRange());
PartitionCollapsingSchemas spSchemas = new PartitionCollapsingSchemas(getSchemas(), planner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace());
job.setOutputFormatClass(AvroKeyWithMetadataOutputFormat.class);
_log.info("Setting input path to schema mappings");
for (String path : spSchemas.getMapInputSchemas().keySet())
{
Schema schema = spSchemas.getMapInputSchemas().get(path);
_log.info("*** " + path);
_log.info("*** => " + schema.toString());
AvroMultipleInputsUtil.setInputKeySchemaForPath(job, schema, path);
}
AvroJob.setMapOutputKeySchema(job, spSchemas.getMapOutputKeySchema());
AvroJob.setMapOutputValueSchema(job, spSchemas.getMapOutputValueSchema());
AvroJob.setOutputKeySchema(job, spSchemas.getReduceOutputSchema());
int numReducers;
if (getNumReducers() != null)
{
numReducers = getNumReducers();
_log.info(String.format("Using %d reducers (fixed)",numReducers));
}
else
{
numReducers = planner.getNumReducers();
_log.info(String.format("Using %d reducers (computed)",numReducers));
}
job.setNumReduceTasks(numReducers);
job.setReducerClass(DelegatingReducer.class);
Path mapperPath = new Path(tempOutputPath,".mapper_impl");
Path reducerPath = new Path(tempOutputPath,".reducer_impl");
Path combinerPath = new Path(tempOutputPath,".combiner_impl");
CollapsingMapper mapper = new CollapsingMapper();
CollapsingReducer reducer = new CollapsingReducer();
mapper.setSchemas(spSchemas);
reducer.setSchemas(spSchemas);
mapper.setMapper(getMapper());
reducer.setAccumulator(getReducerAccumulator());
reducer.setRecordMerger(getRecordMerger());
reducer.setOldRecordMerger(getOldRecordMerger());
mapper.setReuseOutput(_reusePreviousOutput);
reducer.setReuseOutput(_reusePreviousOutput);
configureOutputDateRange(job.getConfiguration(),planner.getCurrentDateRange(), reducer);
DistributedCacheHelper.writeObject(conf, mapper, mapperPath);
DistributedCacheHelper.writeObject(conf, reducer, reducerPath);
conf.set(Parameters.REDUCER_IMPL_PATH, reducerPath.toString());
conf.set(Parameters.MAPPER_IMPL_PATH, mapperPath.toString());
if (isUseCombiner())
{
CollapsingCombiner combiner = new CollapsingCombiner();
configureOutputDateRange(job.getConfiguration(),planner.getCurrentDateRange(), combiner);
combiner.setReuseOutput(_reusePreviousOutput);
combiner.setSchemas(spSchemas);
combiner.setAccumulator(getCombinerAccumulator());
conf.set(Parameters.COMBINER_IMPL_PATH, combinerPath.toString());
job.setCombinerClass(DelegatingCombiner.class);
DistributedCacheHelper.writeObject(conf, combiner, combinerPath);
}
if (!job.waitForCompletion(true))
{
_log.error("Job failed! Quitting...");
throw new RuntimeException("Job failed");
}
report.jobId = job.getJobID().toString();
report.jobName = job.getJobName();
report.countersPath = job.getCountersPath();
report.outputPath = outputPath;
_reports.add(report);
applyRetention();
if (!planner.getNeedsAnotherPass())
{
break;
}
cleanup();
iterations++;
}
}