in datafu-hourglass/src/main/java/datafu/hourglass/jobs/ExecutionPlanner.java [328:415]
protected void determineAvailableInputDates()
{
// first find the latest date available for all inputs
PriorityQueue<Date> dates = new PriorityQueue<Date>();
for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
{
for (Date date : pathMap.keySet())
{
dates.add(date);
}
}
if (dates.size() == 0)
{
throw new RuntimeException("No input data!");
}
List<Date> available = new ArrayList<Date>();
Date currentDate = dates.peek();
int found = 0;
int needed = getInputPaths().size();
while (currentDate != null)
{
Date date = dates.poll();
if (date != null && date.equals(currentDate))
{
found++;
}
else
{
if (found == needed)
{
available.add(currentDate);
}
else if (available.size() > 0)
{
_log.info("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
_log.info("Paths found for " + PathUtils.datedPathFormat.format(currentDate) + ":");
// collect what's available for this date
for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
{
DatePath path = pathMap.get(currentDate);
if (path != null)
{
_log.info("=> " + path);
}
}
if (_failOnMissing)
{
throw new RuntimeException("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
}
else
{
available.add(currentDate);
}
}
found = 0;
currentDate = date;
if (currentDate != null)
{
found++;
}
}
if (found > needed)
{
throw new RuntimeException("found more than needed");
}
}
_availableInputsByDate.clear();
for (Date date : available)
{
List<DatePath> paths = new ArrayList<DatePath>();
for (SortedMap<Date,DatePath> map : _inputPathsByDate)
{
DatePath path = map.get(date);
if (path != null)
{
paths.add(path);
}
}
_availableInputsByDate.put(date, paths);
}
}