protected void determineAvailableInputDates()

in datafu-hourglass/src/main/java/datafu/hourglass/jobs/ExecutionPlanner.java [328:415]


  protected void determineAvailableInputDates()
  {
    // first find the latest date available for all inputs
    PriorityQueue<Date> dates = new PriorityQueue<Date>();
    for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
    {
      for (Date date : pathMap.keySet())
      {
        dates.add(date);
      }
    }
    if (dates.size() == 0)
    {
      throw new RuntimeException("No input data!");
    }
    List<Date> available = new ArrayList<Date>();
    Date currentDate = dates.peek();
    int found = 0;
    int needed = getInputPaths().size();
    while (currentDate != null)
    {
      Date date = dates.poll();
      
      if (date != null && date.equals(currentDate))
      {
        found++;
      }
      else
      {
        if (found == needed)
        {
          available.add(currentDate);
        }
        else if (available.size() > 0)
        {
          _log.info("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
          _log.info("Paths found for " + PathUtils.datedPathFormat.format(currentDate) + ":");
          // collect what's available for this date
          for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
          {
            DatePath path = pathMap.get(currentDate);
            if (path != null)
            {
              _log.info("=> " + path);
            }
          }
          
          if (_failOnMissing)
          {
            throw new RuntimeException("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
          }
          else
          {
            available.add(currentDate);
          }
        }
        
        found = 0;          
        currentDate = date;
        
        if (currentDate != null)
        {
          found++;
        }
      }        
      
      if (found > needed)
      {
        throw new RuntimeException("found more than needed");
      }        
    }
    
    _availableInputsByDate.clear();
    
    for (Date date : available)
    {
      List<DatePath> paths = new ArrayList<DatePath>();
      for (SortedMap<Date,DatePath> map : _inputPathsByDate)
      {
        DatePath path = map.get(date);
        if (path != null)
        {
          paths.add(path);
        }
      }
      _availableInputsByDate.put(date, paths);
    }
  }