private void execute()

in datafu-hourglass/src/main/java/datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.java [380:568]


  private void execute() throws IOException, InterruptedException, ClassNotFoundException
  { 
    int iterations = 0;
    
    while (true)
    {
      PartitionCollapsingExecutionPlanner planner = new PartitionCollapsingExecutionPlanner(getFileSystem(),getProperties());
      planner.setInputPaths(getInputPaths());
      planner.setOutputPath(getOutputPath());
      planner.setStartDate(getStartDate());
      planner.setEndDate(getEndDate());
      planner.setDaysAgo(getDaysAgo());
      planner.setNumDays(getNumDays());
      planner.setMaxToProcess(getMaxToProcess());
      planner.setReusePreviousOutput(getReusePreviousOutput());
      planner.setFailOnMissing(isFailOnMissing());
      planner.createPlan();      
      
      if (planner.getInputsToProcess().size() == 0)
      {
        _log.info("Nothing to do");
        break;
      }
      
      if (iterations >= getMaxIterations())
      {
        throw new RuntimeException(String.format("Already completed %d iterations but the max is %d and there are still %d inputs to process",
                                                 iterations,
                                                 getMaxIterations(),
                                                 planner.getInputsToProcess().size()));
      }
      
      Report report = new Report();
      
      report.inputFiles.addAll(planner.getNewInputsToProcess());
      report.oldInputFiles.addAll(planner.getOldInputsToProcess());
      if (planner.getPreviousOutputToProcess() != null)
      {
        report.reusedOutput = planner.getPreviousOutputToProcess();
      }
      
      DatePath outputPath = DatePath.createDatedPath(getOutputPath(), planner.getCurrentDateRange().getEndDate());
      
      _log.info("Output path: " + outputPath);
      
      Path tempOutputPath = createRandomTempPath();  
      
      _garbage.add(tempOutputPath);
            
      final StagedOutputJob job = StagedOutputJob.createStagedJob(
          getConf(),
          getName() + "-" + PathUtils.datedPathFormat.format(planner.getCurrentDateRange().getEndDate()),            
          null, // no input paths specified here, will add multiple inputs down below
          tempOutputPath.toString(),
          outputPath.getPath().toString(),
          _log);
      
      job.setCountersParentPath(getCountersParentPath());

      if (planner.getNewInputsToProcess() != null && planner.getNewInputsToProcess().size() > 0)
      {
        _log.info("*** New Input data:");
        for (DatePath inputPath : planner.getNewInputsToProcess())
        {
          _log.info(inputPath.getPath());
          MultipleInputs.addInputPath(job, inputPath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
        }
      }
      
      if (planner.getOldInputsToProcess() != null && planner.getOldInputsToProcess().size() > 0)
      {
        _log.info("*** Old Input data:");
        for (DatePath inputPath : planner.getOldInputsToProcess())
        {
          _log.info(inputPath.getPath());
          MultipleInputs.addInputPath(job, inputPath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
        }
      }
      
      if (planner.getPreviousOutputToProcess() != null)
      {
        _log.info("*** Previous output data:");
        _log.info(planner.getPreviousOutputToProcess().getPath());
        MultipleInputs.addInputPath(job, planner.getPreviousOutputToProcess().getPath(), AvroKeyInputFormat.class, AvroKeyValueIdentityMapper.class);
      }
      
      final Configuration conf = job.getConfiguration();
      
      config(conf);
      
      AvroDateRangeMetadata.configureOutputDateRange(conf, planner.getCurrentDateRange());
                  
      PartitionCollapsingSchemas spSchemas = new PartitionCollapsingSchemas(getSchemas(), planner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace());
      
      job.setOutputFormatClass(AvroKeyWithMetadataOutputFormat.class);
      
      _log.info("Setting input path to schema mappings");
      for (String path : spSchemas.getMapInputSchemas().keySet())
      {
        Schema schema = spSchemas.getMapInputSchemas().get(path);
        _log.info("*** " + path);
        _log.info("*** => " + schema.toString());
        AvroMultipleInputsUtil.setInputKeySchemaForPath(job, schema, path);
      }
      
      AvroJob.setMapOutputKeySchema(job, spSchemas.getMapOutputKeySchema());
      AvroJob.setMapOutputValueSchema(job, spSchemas.getMapOutputValueSchema());
      AvroJob.setOutputKeySchema(job, spSchemas.getReduceOutputSchema());
                    
      int numReducers;
            
      if (getNumReducers() != null)
      {
        numReducers = getNumReducers();        
        _log.info(String.format("Using %d reducers (fixed)",numReducers));
      }
      else      
      {         
        numReducers = planner.getNumReducers();      
        _log.info(String.format("Using %d reducers (computed)",numReducers));
      }
          
      job.setNumReduceTasks(numReducers);
      
      job.setReducerClass(DelegatingReducer.class);
      
      Path mapperPath = new Path(tempOutputPath,".mapper_impl");
      Path reducerPath = new Path(tempOutputPath,".reducer_impl");
      Path combinerPath = new Path(tempOutputPath,".combiner_impl");
      
      CollapsingMapper mapper = new CollapsingMapper();
      CollapsingReducer reducer = new CollapsingReducer();
      
      mapper.setSchemas(spSchemas);
      reducer.setSchemas(spSchemas);
      
      mapper.setMapper(getMapper());
      reducer.setAccumulator(getReducerAccumulator());
      reducer.setRecordMerger(getRecordMerger());
      reducer.setOldRecordMerger(getOldRecordMerger());

      mapper.setReuseOutput(_reusePreviousOutput);
      reducer.setReuseOutput(_reusePreviousOutput);
      
      configureOutputDateRange(job.getConfiguration(),planner.getCurrentDateRange(), reducer);
      
      DistributedCacheHelper.writeObject(conf, mapper, mapperPath);
      DistributedCacheHelper.writeObject(conf, reducer, reducerPath);
      
      conf.set(Parameters.REDUCER_IMPL_PATH, reducerPath.toString());
      conf.set(Parameters.MAPPER_IMPL_PATH, mapperPath.toString());
      
      if (isUseCombiner())
      {
        CollapsingCombiner combiner = new CollapsingCombiner();
        configureOutputDateRange(job.getConfiguration(),planner.getCurrentDateRange(), combiner);
        combiner.setReuseOutput(_reusePreviousOutput);
        combiner.setSchemas(spSchemas);
        combiner.setAccumulator(getCombinerAccumulator());
        conf.set(Parameters.COMBINER_IMPL_PATH, combinerPath.toString());
        job.setCombinerClass(DelegatingCombiner.class);
        DistributedCacheHelper.writeObject(conf, combiner, combinerPath);
      }
      
      if (!job.waitForCompletion(true))
      {
        _log.error("Job failed! Quitting...");
        throw new RuntimeException("Job failed");
      }
      
      report.jobId = job.getJobID().toString();
      report.jobName = job.getJobName();
      report.countersPath = job.getCountersPath();
      report.outputPath = outputPath;
      
      _reports.add(report);
      
      applyRetention();
      
      if (!planner.getNeedsAnotherPass())
      {
        break;
      }
      
      cleanup();
      
      iterations++;
    }
  }