in datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java [234:275]
private void determineInputsToProcess()
{
_log.info("Determining inputs to process");
_latestInputByPath.clear();
int newDataCount = 0;
Calendar cal = Calendar.getInstance(PathUtils.timeZone);
for (Date currentDate=getDateRange().getBeginDate(); currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
{
if (!_outputPathsByDate.containsKey(currentDate))
{
List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
if (inputs != null)
{
if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
{
// too much data to process in a single run, will require another pass
_needAnotherPass = true;
break;
}
for (DatePath input : inputs)
{
_log.info(String.format("Input: %s",input.getPath()));
_inputsToProcess.add(input);
Path root = PathUtils.getNestedPathRoot(input.getPath());
_latestInputByPath.put(root.toString(), input.getPath().toString());
}
newDataCount++;
}
else if (isFailOnMissing())
{
throw new RuntimeException("missing input data for " + currentDate);
}
}
cal.setTime(currentDate);
cal.add(Calendar.DAY_OF_MONTH, 1);
currentDate = cal.getTime();
}
}