public void run()

in datafu-hourglass/src/main/java/datafu/hourglass/jobs/AbstractNonIncrementalJob.java [150:284]


  public void run() throws IOException, InterruptedException, ClassNotFoundException
  {     
    _report = new Report();
    
    Calendar cal = Calendar.getInstance(PathUtils.timeZone);   
    
    if (!getFileSystem().exists(getOutputPath()))
    {
      getFileSystem().mkdirs(getOutputPath());
    }
    
    if (getInputPaths().size() > 1)
    {
      throw new RuntimeException("Only a single input is supported");
    }
    
    List<DatePath> inputs = PathUtils.findNestedDatedPaths(getFileSystem(), getInputPaths().get(0));
    
    DatePath latestInput = (inputs.size() > 0) ? inputs.get(inputs.size() - 1) : null; 
    
    if (inputs.size() == 0)
    {
      throw new RuntimeException("no input data available");
    }
    
    List<Date> dates = new ArrayList<Date>();
    for (DatePath dp : inputs)
    {
      dates.add(dp.getDate());
    }
    
    DateRange dateRange = DateRangePlanner.getDateRange(getStartDate(), getEndDate(), dates, getDaysAgo(), getNumDays(), true);
    
    Map<Date,DatePath> existingInputs = new HashMap<Date,DatePath>();
    for (DatePath input : inputs)
    {
      existingInputs.put(input.getDate(), input);
    }
    
    _log.info("Getting schema for input " + latestInput.getPath());
    Schema inputSchema = PathUtils.getSchemaFromPath(getFileSystem(),latestInput.getPath());
    
    ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProperties());
    
    List<String> inputPaths = new ArrayList<String>();
    for (Date currentDate=dateRange.getBeginDate(); currentDate.compareTo(dateRange.getEndDate()) <= 0; )
    { 
      DatePath input = existingInputs.get(currentDate);  
      if (input != null)
      { 
        _log.info(String.format("Processing %s",input.getPath()));
        inputPaths.add(input.getPath().toString());
        estimator.addInputPath(input.getPath());
        _report.inputFiles.add(input);
        latestInput = input;
      }
      else
      {
        throw new RuntimeException(String.format("Missing input for %s",PathUtils.datedPathFormat.format(currentDate)));
      }
      
      cal.setTime(currentDate);
      cal.add(Calendar.DAY_OF_MONTH, 1);
      currentDate = cal.getTime();
    }
        
    Path timestampOutputPath = new Path(getOutputPath(),PathUtils.datedPathFormat.format(latestInput.getDate()));
    
    final StagedOutputJob job = StagedOutputJob.createStagedJob(
                                          getConf(),
                                          getName() + "-" + PathUtils.datedPathFormat.format(latestInput.getDate()),            
                                          inputPaths,
                                          "/tmp" + timestampOutputPath.toString(),
                                          timestampOutputPath.toString(),
                                          _log);
    
    
    job.setCountersParentPath(getCountersParentPath());
    
    if (_combineInputs)
    {
      job.setInputFormatClass(CombinedAvroKeyInputFormat.class);
    }
    else
    {
      job.setInputFormatClass(AvroKeyInputFormat.class);
    }
    
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    
    AvroJob.setInputKeySchema(job, inputSchema);
    AvroJob.setMapOutputKeySchema(job, getMapOutputKeySchema());
    AvroJob.setMapOutputValueSchema(job, getMapOutputValueSchema());
    AvroJob.setOutputKeySchema(job, getReduceOutputSchema());
    
    int numReducers;
    if (getNumReducers() != null)
    {
      numReducers = getNumReducers();        
      _log.info(String.format("Using %d reducers (fixed)",numReducers));
    }
    else      
    {         
      numReducers = estimator.getNumReducers();        
      _log.info(String.format("Using %d reducers (computed)",numReducers));
    }
        
    job.setNumReduceTasks(numReducers);
    
    job.setMapperClass(getMapperClass());
    job.setReducerClass(getReducerClass());
    
    if (isUseCombiner() && getCombinerClass() != null)
    {
      job.setCombinerClass(getCombinerClass());
    }
    
    config(job.getConfiguration());
    
    if (!job.waitForCompletion(true))
    {
      _log.error("Job failed! Quitting...");
      throw new RuntimeException("Job failed");
    }
        
    _report.jobId = job.getJobID().toString();
    _report.jobName = job.getJobName();
    _report.countersPath = job.getCountersPath();
    _report.outputFile = new DatePath(latestInput.getDate(),timestampOutputPath);
    
    if (getRetentionCount() != null)
    {
      PathUtils.keepLatestDatedPaths(getFileSystem(), getOutputPath(), getRetentionCount());
    }   
  }