in datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java [198:270]
public void createPlan() throws IOException
{
if (_plan != null) throw new RuntimeException("Plan already exists");
_log.info("Creating execution plan");
loadInputData();
loadOutputData();
determineAvailableInputDates();
determineDateRange();
List<Plan> plans = new ArrayList<Plan>();
Plan plan;
if (_reusePreviousOutput)
{
_log.info("Output may be reused, will create alternative plan that does not reuse output");
plan = new Plan();
try
{
determineInputsToProcess(false,plan);
plan.finalizePlan();
plans.add(plan);
}
catch (MaxInputDataExceededException e)
{
_log.info(e.getMessage());
}
}
_log.info(String.format("Creating plan that %s previous output",(_reusePreviousOutput ? "reuses" : "does not reuse")));
plan = new Plan();
try
{
determineInputsToProcess(_reusePreviousOutput,plan);
}
catch (MaxInputDataExceededException e)
{
throw new RuntimeException(e);
}
plan.finalizePlan();
plans.add(plan);
if (plans.size() > 1)
{
_log.info(String.format("There are %d alternative execution plans:",plans.size()));
for (Plan option : plans)
{
_log.info(String.format("* Consume %d new inputs, %d old inputs, %s previous output (%d bytes)",
option._newInputsToProcess.size(),
option._oldInputsToProcess.size(),
option._previousOutputToProcess != null ? "reuse" : "no",
option._totalBytes));
}
// choose plan with least bytes consumed
Collections.sort(plans, new Comparator<Plan>() {
@Override
public int compare(Plan o1, Plan o2)
{
return o1._totalBytes.compareTo(o2._totalBytes);
}
});
_plan = plans.get(0);
_log.info(String.format("Choosing plan consuming %d bytes",_plan._totalBytes));
}
else
{
_plan = plans.get(0);
}
}