in datafu-hourglass/src/main/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java [318:518]
private void execute() throws IOException, InterruptedException, ClassNotFoundException
{
int iterations = 0;
while(true)
{
PartitionPreservingExecutionPlanner planner = new PartitionPreservingExecutionPlanner(getFileSystem(),getProperties());
planner.setInputPaths(getInputPaths());
planner.setOutputPath(getOutputPath());
planner.setStartDate(getStartDate());
planner.setEndDate(getEndDate());
planner.setDaysAgo(getDaysAgo());
planner.setNumDays(getNumDays());
planner.setMaxToProcess(getMaxToProcess());
planner.setFailOnMissing(isFailOnMissing());
planner.createPlan();
if (planner.getInputsToProcess().size() == 0)
{
_log.info("Found all necessary incremental data");
break;
}
if (iterations >= getMaxIterations())
{
throw new RuntimeException(String.format("Already completed %d iterations but the max is %d and there are still %d inputs to process",
iterations,
getMaxIterations(),
planner.getInputsToProcess().size()));
}
Path jobTempPath = createRandomTempPath();
_garbage.add(jobTempPath);
ensurePath(getOutputPath());
Path incrementalStagingPath = ensurePath(new Path(jobTempPath,".incremental-staging"));
Path incrementalStagingTmpPath = ensurePath(new Path(jobTempPath,".incremental-staging-tmp"));
Report report = new Report();
// create input paths for job
List<String> inputPaths = new ArrayList<String>();
for (DatePath input : planner.getInputsToProcess())
{
inputPaths.add(input.getPath().toString());
report.inputFiles.add(input);
}
_log.info("Staging path: " + incrementalStagingPath);
final StagedOutputJob job = StagedOutputJob.createStagedJob(
getConf(),
getName() + "-" + "incremental",
inputPaths,
incrementalStagingTmpPath.toString(),
incrementalStagingPath.toString(),
_log);
job.setCountersParentPath(getCountersParentPath());
final Configuration conf = job.getConfiguration();
config(conf);
PartitionPreservingSchemas fpSchemas = new PartitionPreservingSchemas(getSchemas(), planner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace() );
job.setInputFormatClass(AvroMultipleInputsKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
_log.info("Setting input path to schema mappings");
for (String path : fpSchemas.getMapInputSchemas().keySet())
{
Schema schema = fpSchemas.getMapInputSchemas().get(path);
_log.info("*** " + path);
_log.info("*** => " + schema.toString());
AvroMultipleInputsUtil.setInputKeySchemaForPath(job, schema, path);
}
AvroJob.setMapOutputKeySchema(job, fpSchemas.getMapOutputKeySchema());
AvroJob.setMapOutputValueSchema(job, fpSchemas.getMapOutputValueSchema());
AvroJob.setOutputKeySchema(job, fpSchemas.getReduceOutputSchema());
StringBuilder inputTimesJoined = new StringBuilder();
for (Date input : planner.getDatesToProcess())
{
String namedOutput = PathUtils.datedPathFormat.format(input);
_log.info(String.format("Adding named output %s",namedOutput));
AvroMultipleOutputs.addNamedOutput(job,
namedOutput,
AvroKeyOutputFormat.class,
fpSchemas.getReduceOutputSchema());
inputTimesJoined.append(Long.toString(input.getTime()));
inputTimesJoined.append(",");
}
int numReducers;
if (getNumReducers() != null)
{
numReducers = getNumReducers();
_log.info(String.format("Using %d reducers (fixed)",numReducers));
}
else
{
numReducers = planner.getNumReducers();
_log.info(String.format("Using %d reducers (computed)",numReducers));
}
int avgReducersPerInput = (int)Math.ceil(numReducers/(double)planner.getDatesToProcess().size());
_log.info(String.format("Reducers per input path: %d", avgReducersPerInput));
// counters for multiple outputs
// conf.set("mo.counters", "true");
conf.set(TimePartitioner.REDUCERS_PER_INPUT, Integer.toString(avgReducersPerInput));
conf.set(TimePartitioner.INPUT_TIMES, inputTimesJoined.substring(0,inputTimesJoined.length()-1));
job.setNumReduceTasks(numReducers);
Path mapperPath = new Path(incrementalStagingPath,".mapper_impl");
Path reducerPath = new Path(incrementalStagingPath,".reducer_impl");
Path combinerPath = new Path(incrementalStagingPath,".combiner_impl");
conf.set(Parameters.REDUCER_IMPL_PATH, reducerPath.toString());
conf.set(Parameters.MAPPER_IMPL_PATH, mapperPath.toString());
_mapper = new PartitioningMapper();
_mapper.setSchemas(fpSchemas);
_mapper.setMapper(getMapper());
_reducer = new PartitioningReducer();
_reducer.setSchemas(fpSchemas);
_reducer.setAccumulator(getReducerAccumulator());
DistributedCacheHelper.writeObject(conf, getMapProcessor(), mapperPath);
DistributedCacheHelper.writeObject(conf, getReduceProcessor(), reducerPath);
job.setMapperClass(DelegatingMapper.class);
job.setReducerClass(DelegatingReducer.class);
if (isUseCombiner())
{
_combiner = new PartitioningCombiner();
_combiner.setAccumulator(getCombinerAccumulator());
conf.set(Parameters.COMBINER_IMPL_PATH, combinerPath.toString());
job.setCombinerClass(DelegatingCombiner.class);
DistributedCacheHelper.writeObject(conf, getCombineProcessor(), combinerPath);
}
job.setPartitionerClass(TimePartitioner.class);
if (!job.waitForCompletion(true))
{
_log.error("Job failed! Quitting...");
throw new RuntimeException("Job failed");
}
report.jobName = job.getJobName();
report.jobId = job.getJobID().toString();
moveStagedFiles(report,incrementalStagingPath);
if (getCountersParentPath() == null && job.getCountersPath() != null)
{
// save the counters in the target path, for lack of a better place to put it
Path counters = job.getCountersPath();
if (getFileSystem().exists(counters))
{
Path target = new Path(getOutputPath(),counters.getName());
if (getFileSystem().exists(target))
{
_log.info(String.format("Removing old counters at %s",target));
getFileSystem().delete(target, true);
}
_log.info(String.format("Moving %s to %s",counters.getName(),getOutputPath()));
getFileSystem().rename(counters, target);
report.countersPath = target;
}
else
{
_log.error("Could not find counters at " + counters);
}
}
applyRetention();
_reports.add(report);
if (!planner.getNeedsAnotherPass())
{
break;
}
cleanup();
iterations++;
}
}