in datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java [430:539]
private void determineInputsToProcess(boolean reusePreviousOutput, Plan plan) throws IOException, MaxInputDataExceededException
{
Calendar cal = Calendar.getInstance(PathUtils.timeZone);
DateRange outputDateRange = null;
if (reusePreviousOutput)
{
if (_outputPathsByDate.size() > 0)
{
DatePath latestPriorOutput = _outputPathsByDate.get(Collections.max(_outputPathsByDate.keySet()));
_log.info("Have previous output, determining what previous incremental data to difference out");
outputDateRange = AvroDateRangeMetadata.getOutputFileDateRange(getFileSystem(),latestPriorOutput.getPath());
_log.info(String.format("Previous output has date range %s to %s",
PathUtils.datedPathFormat.format(outputDateRange.getBeginDate()),
PathUtils.datedPathFormat.format(outputDateRange.getEndDate())));
for (Date currentDate=outputDateRange.getBeginDate();
currentDate.compareTo(getDateRange().getBeginDate()) < 0
&& currentDate.compareTo(outputDateRange.getEndDate()) <= 0;)
{
if (!getAvailableInputsByDate().containsKey(currentDate))
{
throw new RuntimeException(String.format("Missing incremental data for %s, so can't remove it from previous output",PathUtils.datedPathFormat.format(currentDate)));
}
List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
for (DatePath input : inputs)
{
_log.info(String.format("Old Input: %s",input.getPath()));
plan._inputsToProcess.add(input);
plan._oldInputsToProcess.add(input);
Path root = PathUtils.getNestedPathRoot(input.getPath());
plan._latestInputByPath.put(root.toString(), input.getPath().toString());
}
cal.setTime(currentDate);
cal.add(Calendar.DAY_OF_MONTH, 1);
currentDate = cal.getTime();
}
plan._previousOutputToProcess = latestPriorOutput;
_log.info("Previous Output: " + plan._previousOutputToProcess.getPath());
}
else
{
_log.info("No previous output to reuse");
}
}
// consume the incremental data and produce the final output
int newDataCount = 0;
Date startDate = getDateRange().getBeginDate();
Date endDate = startDate;
for (Date currentDate=startDate; currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
{
if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
{
if (!reusePreviousOutput)
{
throw new MaxInputDataExceededException(String.format("Amount of input data has exceeded max of %d however output is not being reused so cannot do in multiple passes", getMaxToProcess()));
}
// too much data to process in a single run, will require another pass
plan._needAnotherPass = true;
break;
}
if (outputDateRange == null || currentDate.compareTo(outputDateRange.getEndDate()) > 0)
{
if (!getAvailableInputsByDate().containsKey(currentDate))
{
if (isFailOnMissing())
{
throw new RuntimeException("missing " + PathUtils.datedPathFormat.format(currentDate));
}
else
{
_log.info("No input data found for " + PathUtils.datedPathFormat.format(currentDate));
}
}
else
{
List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
for (DatePath input : inputs)
{
_log.info(String.format("New Input: %s",input.getPath()));
plan._inputsToProcess.add(input);
plan._newInputsToProcess.add(input);
Path root = PathUtils.getNestedPathRoot(input.getPath());
plan._latestInputByPath.put(root.toString(), input.getPath().toString());
}
newDataCount++;
}
}
cal.setTime(currentDate);
endDate = cal.getTime();
cal.add(Calendar.DAY_OF_MONTH, 1);
currentDate = cal.getTime();
}
plan._currentDateRange = new DateRange(startDate,endDate);
}