private void writeCounters()

in datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java [337:545]


  private void writeCounters(final FileSystem fs) throws IOException
  {
    final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
    
    SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
    
    String suffix = timestampFormat.format(new Date());
    
    if (_countersParentPath != null)
    {
      if (!fs.exists(_countersParentPath))
      {
        _log.info("Creating counter parent path " + _countersParentPath);
        fs.mkdirs(_countersParentPath, FsPermission.valueOf("-rwxrwxr-x"));
      }
      // make the name as unique as possible in this case because this may be a directory
      // where other counter files will be dropped
      _countersPath = new Path(_countersParentPath,".counters." + suffix);
    }
    else
    {
      _countersPath = new Path(actualOutputPath,".counters." + suffix);
    }
    
    _log.info(String.format("Writing counters to %s", _countersPath));
    FSDataOutputStream counterStream = fs.create(_countersPath);
    BufferedOutputStream buffer = new BufferedOutputStream(counterStream,256*1024);
    OutputStreamWriter writer = new OutputStreamWriter(buffer);      
    for (String groupName : getCounters().getGroupNames())
    {
      for (Counter counter : getCounters().getGroup(groupName))
      {
        writeAndLog(writer,String.format("%s=%d",counter.getName(),counter.getValue()));
      }
    }
          
    JobID jobID = this.getJobID();
    
    org.apache.hadoop.mapred.JobID oldJobId = new org.apache.hadoop.mapred.JobID(jobID.getJtIdentifier(),jobID.getId());
    
    long minStart = Long.MAX_VALUE;      
    long maxFinish = 0;
    long setupStart = Long.MAX_VALUE;
    long cleanupFinish = 0;
    DescriptiveStatistics mapStats = new DescriptiveStatistics();
    DescriptiveStatistics reduceStats = new DescriptiveStatistics();
    boolean success = true;
    
    JobClient jobClient = new JobClient(this.conf);
    
    Map<String,String> taskIdToType = new HashMap<String,String>();
               
    TaskReport[] setupReports = jobClient.getSetupTaskReports(oldJobId);
    if (setupReports.length > 0)
    {
      _log.info("Processing setup reports");
      for (TaskReport report : jobClient.getSetupTaskReports(oldJobId))
      {
        taskIdToType.put(report.getTaskID().toString(),"SETUP");
        if (report.getStartTime() == 0)
        {
          _log.warn("Skipping report with zero start time");
          continue;
        }
        setupStart = Math.min(setupStart, report.getStartTime());
      }
    }
    else
    {
      _log.error("No setup reports");
    }
    
    TaskReport[] mapReports = jobClient.getMapTaskReports(oldJobId);
    if (mapReports.length > 0)
    {
      _log.info("Processing map reports");
      for (TaskReport report : mapReports)
      {
        taskIdToType.put(report.getTaskID().toString(),"MAP");
        if (report.getFinishTime() == 0 || report.getStartTime() == 0)
        {
          _log.warn("Skipping report with zero start or finish time");
          continue;
        }
        minStart = Math.min(minStart, report.getStartTime());
        mapStats.addValue(report.getFinishTime() - report.getStartTime());
      }
    }
    else
    {
      _log.error("No map reports");
    }
    
    TaskReport[] reduceReports = jobClient.getReduceTaskReports(oldJobId);
    if (reduceReports.length > 0)
    {
      _log.info("Processing reduce reports");
      for (TaskReport report : reduceReports)
      {      
        taskIdToType.put(report.getTaskID().toString(),"REDUCE");
        if (report.getFinishTime() == 0 || report.getStartTime() == 0)
        {
          _log.warn("Skipping report with zero start or finish time");
          continue;
        }
        maxFinish = Math.max(maxFinish, report.getFinishTime());
        reduceStats.addValue(report.getFinishTime() - report.getStartTime());
      }
    }
    else
    {
      _log.error("No reduce reports");
    }
    
    TaskReport[] cleanupReports = jobClient.getCleanupTaskReports(oldJobId);
    if (cleanupReports.length > 0)
    {
      _log.info("Processing cleanup reports");
      for (TaskReport report : cleanupReports)
      {
        taskIdToType.put(report.getTaskID().toString(),"CLEANUP");
        if (report.getFinishTime() == 0)
        {
          _log.warn("Skipping report with finish time of zero");
          continue;
        }
        cleanupFinish = Math.max(cleanupFinish, report.getFinishTime());
      }
    }
    else
    {
      _log.error("No cleanup reports");
    }
      
    if (minStart == Long.MAX_VALUE)
    {
      _log.error("Could not determine map-reduce start time");
      success = false;
    }      
    if (maxFinish == 0)
    {
      _log.error("Could not determine map-reduce finish time");
      success = false;
    }
    
    if (setupStart == Long.MAX_VALUE)
    {
      _log.error("Could not determine setup start time");
      success = false;
    }      
    if (cleanupFinish == 0)
    {
      _log.error("Could not determine cleanup finish time");
      success = false;
    }     
    
    // Collect statistics on successful/failed/killed task attempts, categorized by setup/map/reduce/cleanup.
    // Unfortunately the job client doesn't have an easier way to get these statistics.
    Map<String,Integer> attemptStats = new HashMap<String,Integer>();
    _log.info("Processing task attempts");            
    for (TaskCompletionEvent event : getTaskCompletionEvents(jobClient,oldJobId))
    {
      String type = taskIdToType.get(event.getTaskAttemptId().getTaskID().toString());
      String status = event.getTaskStatus().toString();
      
      String key = String.format("%s_%s_ATTEMPTS",status,type);
      if (!attemptStats.containsKey(key))
      {
        attemptStats.put(key, 0);
      }
      attemptStats.put(key, attemptStats.get(key) + 1);
    }
              
    if (success)
    {
      writeAndLog(writer,String.format("SETUP_START_TIME_MS=%d",setupStart));
      writeAndLog(writer,String.format("CLEANUP_FINISH_TIME_MS=%d",cleanupFinish));
      writeAndLog(writer,String.format("COMPLETE_WALL_CLOCK_TIME_MS=%d",cleanupFinish - setupStart));
      
      writeAndLog(writer,String.format("MAP_REDUCE_START_TIME_MS=%d",minStart));
      writeAndLog(writer,String.format("MAP_REDUCE_FINISH_TIME_MS=%d",maxFinish));
      writeAndLog(writer,String.format("MAP_REDUCE_WALL_CLOCK_TIME_MS=%d",maxFinish - minStart));
      
      writeAndLog(writer,String.format("MAP_TOTAL_TASKS=%d",(long)mapStats.getN()));
      writeAndLog(writer,String.format("MAP_MAX_TIME_MS=%d",(long)mapStats.getMax()));
      writeAndLog(writer,String.format("MAP_MIN_TIME_MS=%d",(long)mapStats.getMin()));
      writeAndLog(writer,String.format("MAP_AVG_TIME_MS=%d",(long)mapStats.getMean()));
      writeAndLog(writer,String.format("MAP_STD_TIME_MS=%d",(long)mapStats.getStandardDeviation()));
      writeAndLog(writer,String.format("MAP_SUM_TIME_MS=%d",(long)mapStats.getSum()));
      
      writeAndLog(writer,String.format("REDUCE_TOTAL_TASKS=%d",(long)reduceStats.getN()));
      writeAndLog(writer,String.format("REDUCE_MAX_TIME_MS=%d",(long)reduceStats.getMax()));
      writeAndLog(writer,String.format("REDUCE_MIN_TIME_MS=%d",(long)reduceStats.getMin()));
      writeAndLog(writer,String.format("REDUCE_AVG_TIME_MS=%d",(long)reduceStats.getMean()));
      writeAndLog(writer,String.format("REDUCE_STD_TIME_MS=%d",(long)reduceStats.getStandardDeviation()));
      writeAndLog(writer,String.format("REDUCE_SUM_TIME_MS=%d",(long)reduceStats.getSum()));
      
      writeAndLog(writer,String.format("MAP_REDUCE_SUM_TIME_MS=%d",(long)mapStats.getSum() + (long)reduceStats.getSum()));
      
      for (Map.Entry<String, Integer> attemptStat : attemptStats.entrySet())
      {
        writeAndLog(writer,String.format("%s=%d",attemptStat.getKey(),attemptStat.getValue()));
      }
    }
    
    writer.close();
    buffer.close();
    counterStream.close();
  }