private void determineInputsToProcess()

in datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java [430:539]


  private void determineInputsToProcess(boolean reusePreviousOutput, Plan plan) throws IOException, MaxInputDataExceededException
  {
    Calendar cal = Calendar.getInstance(PathUtils.timeZone);    
        
    DateRange outputDateRange = null;
    
    if (reusePreviousOutput)
    {
      if (_outputPathsByDate.size() > 0)
      {
        DatePath latestPriorOutput = _outputPathsByDate.get(Collections.max(_outputPathsByDate.keySet()));
        _log.info("Have previous output, determining what previous incremental data to difference out");
        outputDateRange = AvroDateRangeMetadata.getOutputFileDateRange(getFileSystem(),latestPriorOutput.getPath());
        _log.info(String.format("Previous output has date range %s to %s",
                  PathUtils.datedPathFormat.format(outputDateRange.getBeginDate()),
                  PathUtils.datedPathFormat.format(outputDateRange.getEndDate())));
        
        for (Date currentDate=outputDateRange.getBeginDate(); 
             currentDate.compareTo(getDateRange().getBeginDate()) < 0
             && currentDate.compareTo(outputDateRange.getEndDate()) <= 0;)
        {
          if (!getAvailableInputsByDate().containsKey(currentDate))
          {  
            throw new RuntimeException(String.format("Missing incremental data for %s, so can't remove it from previous output",PathUtils.datedPathFormat.format(currentDate)));
          }
          
          List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
          
          for (DatePath input : inputs)
          {
            _log.info(String.format("Old Input: %s",input.getPath()));
            plan._inputsToProcess.add(input);
            plan._oldInputsToProcess.add(input);
            
            Path root = PathUtils.getNestedPathRoot(input.getPath());
            plan._latestInputByPath.put(root.toString(), input.getPath().toString());
          }
                                  
          cal.setTime(currentDate);
          cal.add(Calendar.DAY_OF_MONTH, 1);
          currentDate = cal.getTime();
        }
          
        plan._previousOutputToProcess = latestPriorOutput;
        _log.info("Previous Output: " + plan._previousOutputToProcess.getPath());
      }
      else
      {
        _log.info("No previous output to reuse");
      }
    }
    
    // consume the incremental data and produce the final output
    
    int newDataCount = 0;
    Date startDate = getDateRange().getBeginDate();
    Date endDate = startDate;
    for (Date currentDate=startDate; currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
    { 
      if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
      {
        if (!reusePreviousOutput)
        {
          throw new MaxInputDataExceededException(String.format("Amount of input data has exceeded max of %d however output is not being reused so cannot do in multiple passes", getMaxToProcess()));
        }
        
        // too much data to process in a single run, will require another pass
        plan._needAnotherPass = true;
        break;
      }
      
      if (outputDateRange == null || currentDate.compareTo(outputDateRange.getEndDate()) > 0)
      {
        if (!getAvailableInputsByDate().containsKey(currentDate))
        {
          if (isFailOnMissing())
          {
            throw new RuntimeException("missing " + PathUtils.datedPathFormat.format(currentDate));            
          }
          else
          {
            _log.info("No input data found for " + PathUtils.datedPathFormat.format(currentDate));
          }
        }
        else
        {
          List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
          
          for (DatePath input : inputs)
          {
            _log.info(String.format("New Input: %s",input.getPath()));
            plan._inputsToProcess.add(input);
            plan._newInputsToProcess.add(input);
            
            Path root = PathUtils.getNestedPathRoot(input.getPath());
            plan._latestInputByPath.put(root.toString(), input.getPath().toString());
          }
                    
          newDataCount++;
        }
      }
      
      cal.setTime(currentDate);
      endDate = cal.getTime();
      cal.add(Calendar.DAY_OF_MONTH, 1);
      currentDate = cal.getTime();
    }
    
    plan._currentDateRange = new DateRange(startDate,endDate);
  }