private void execute()

in datafu-hourglass/src/main/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java [318:518]


  private void execute() throws IOException, InterruptedException, ClassNotFoundException
  {  
    int iterations = 0;
    
    while(true)
    {
      PartitionPreservingExecutionPlanner planner = new PartitionPreservingExecutionPlanner(getFileSystem(),getProperties());
      planner.setInputPaths(getInputPaths());
      planner.setOutputPath(getOutputPath());
      planner.setStartDate(getStartDate());
      planner.setEndDate(getEndDate());
      planner.setDaysAgo(getDaysAgo());
      planner.setNumDays(getNumDays());
      planner.setMaxToProcess(getMaxToProcess());
      planner.setFailOnMissing(isFailOnMissing());
      planner.createPlan();
      
      if (planner.getInputsToProcess().size() == 0)
      {
        _log.info("Found all necessary incremental data");
        break;
      }
      
      if (iterations >= getMaxIterations())
      {
        throw new RuntimeException(String.format("Already completed %d iterations but the max is %d and there are still %d inputs to process",
                                                 iterations,
                                                 getMaxIterations(),
                                                 planner.getInputsToProcess().size()));
      }
      
      Path jobTempPath = createRandomTempPath();  
      _garbage.add(jobTempPath);
      ensurePath(getOutputPath());
      
      Path incrementalStagingPath = ensurePath(new Path(jobTempPath,".incremental-staging"));
      Path incrementalStagingTmpPath = ensurePath(new Path(jobTempPath,".incremental-staging-tmp"));
      
      Report report = new Report();    
           
      // create input paths for job
      List<String> inputPaths = new ArrayList<String>();
      for (DatePath input : planner.getInputsToProcess())
      {
        inputPaths.add(input.getPath().toString());
        report.inputFiles.add(input);
      }
      
      _log.info("Staging path: " + incrementalStagingPath);
      final StagedOutputJob job = StagedOutputJob.createStagedJob(
                                    getConf(),
                                    getName() + "-" + "incremental",            
                                    inputPaths,
                                    incrementalStagingTmpPath.toString(),
                                    incrementalStagingPath.toString(),
                                    _log);
              
      job.setCountersParentPath(getCountersParentPath());
      
      final Configuration conf = job.getConfiguration();
      
      config(conf);
      
      PartitionPreservingSchemas fpSchemas = new PartitionPreservingSchemas(getSchemas(), planner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace() );
      
      job.setInputFormatClass(AvroMultipleInputsKeyInputFormat.class);
      
      job.setOutputFormatClass(AvroKeyOutputFormat.class);
      
      _log.info("Setting input path to schema mappings");
      for (String path : fpSchemas.getMapInputSchemas().keySet())
      {
        Schema schema = fpSchemas.getMapInputSchemas().get(path);
        _log.info("*** " + path);
        _log.info("*** => " + schema.toString());
        AvroMultipleInputsUtil.setInputKeySchemaForPath(job, schema, path);
      }
            
      AvroJob.setMapOutputKeySchema(job, fpSchemas.getMapOutputKeySchema());
      AvroJob.setMapOutputValueSchema(job, fpSchemas.getMapOutputValueSchema());
      AvroJob.setOutputKeySchema(job, fpSchemas.getReduceOutputSchema());
            
      StringBuilder inputTimesJoined = new StringBuilder();
      for (Date input : planner.getDatesToProcess())
      {
        String namedOutput = PathUtils.datedPathFormat.format(input);
        _log.info(String.format("Adding named output %s",namedOutput));
        AvroMultipleOutputs.addNamedOutput(job, 
                                           namedOutput, 
                                           AvroKeyOutputFormat.class, 
                                           fpSchemas.getReduceOutputSchema());
        
        inputTimesJoined.append(Long.toString(input.getTime()));
        inputTimesJoined.append(",");
      }
      
      int numReducers;
      
      if (getNumReducers() != null)
      {
        numReducers = getNumReducers();
        _log.info(String.format("Using %d reducers (fixed)",numReducers));
      }
      else
      {
        numReducers = planner.getNumReducers();
        _log.info(String.format("Using %d reducers (computed)",numReducers));
      }
      
      int avgReducersPerInput = (int)Math.ceil(numReducers/(double)planner.getDatesToProcess().size());
      
      _log.info(String.format("Reducers per input path: %d", avgReducersPerInput));    
      
      // counters for multiple outputs
//      conf.set("mo.counters", "true");
      
      conf.set(TimePartitioner.REDUCERS_PER_INPUT, Integer.toString(avgReducersPerInput));
      conf.set(TimePartitioner.INPUT_TIMES, inputTimesJoined.substring(0,inputTimesJoined.length()-1));    
              
      job.setNumReduceTasks(numReducers);
      
      Path mapperPath = new Path(incrementalStagingPath,".mapper_impl");
      Path reducerPath = new Path(incrementalStagingPath,".reducer_impl");
      Path combinerPath = new Path(incrementalStagingPath,".combiner_impl");
      
      conf.set(Parameters.REDUCER_IMPL_PATH, reducerPath.toString());
      conf.set(Parameters.MAPPER_IMPL_PATH, mapperPath.toString());
      
      _mapper = new PartitioningMapper();
      _mapper.setSchemas(fpSchemas);
      _mapper.setMapper(getMapper());

      _reducer = new PartitioningReducer();
      _reducer.setSchemas(fpSchemas);
      _reducer.setAccumulator(getReducerAccumulator());
      
      DistributedCacheHelper.writeObject(conf, getMapProcessor(), mapperPath);
      DistributedCacheHelper.writeObject(conf, getReduceProcessor(), reducerPath);
      
      job.setMapperClass(DelegatingMapper.class);
      job.setReducerClass(DelegatingReducer.class);
      
      if (isUseCombiner())
      {
        _combiner = new PartitioningCombiner();
        _combiner.setAccumulator(getCombinerAccumulator());
        conf.set(Parameters.COMBINER_IMPL_PATH, combinerPath.toString());
        job.setCombinerClass(DelegatingCombiner.class);
        DistributedCacheHelper.writeObject(conf, getCombineProcessor(), combinerPath);
      }
      
      job.setPartitionerClass(TimePartitioner.class);
            
      if (!job.waitForCompletion(true))
      {
        _log.error("Job failed! Quitting...");
        throw new RuntimeException("Job failed");
      }
      
      report.jobName = job.getJobName();
      report.jobId = job.getJobID().toString();
        
      moveStagedFiles(report,incrementalStagingPath);
      
      if (getCountersParentPath() == null && job.getCountersPath() != null)
      {
        // save the counters in the target path, for lack of a better place to put it
        Path counters = job.getCountersPath();
        if (getFileSystem().exists(counters))
        {
          Path target = new Path(getOutputPath(),counters.getName());
          if (getFileSystem().exists(target))
          {
            _log.info(String.format("Removing old counters at %s",target));
            getFileSystem().delete(target, true);
          }
          _log.info(String.format("Moving %s to %s",counters.getName(),getOutputPath()));
          getFileSystem().rename(counters, target);
          
          report.countersPath = target;
        }
        else
        {
          _log.error("Could not find counters at " + counters);
        }
      }

      applyRetention();
              
      _reports.add(report);
      
      if (!planner.getNeedsAnotherPass())
      {
        break;
      }
      
      cleanup();
      
      iterations++;
    }
  }