in datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java [337:545]
private void writeCounters(final FileSystem fs) throws IOException
{
final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
String suffix = timestampFormat.format(new Date());
if (_countersParentPath != null)
{
if (!fs.exists(_countersParentPath))
{
_log.info("Creating counter parent path " + _countersParentPath);
fs.mkdirs(_countersParentPath, FsPermission.valueOf("-rwxrwxr-x"));
}
// make the name as unique as possible in this case because this may be a directory
// where other counter files will be dropped
_countersPath = new Path(_countersParentPath,".counters." + suffix);
}
else
{
_countersPath = new Path(actualOutputPath,".counters." + suffix);
}
_log.info(String.format("Writing counters to %s", _countersPath));
FSDataOutputStream counterStream = fs.create(_countersPath);
BufferedOutputStream buffer = new BufferedOutputStream(counterStream,256*1024);
OutputStreamWriter writer = new OutputStreamWriter(buffer);
for (String groupName : getCounters().getGroupNames())
{
for (Counter counter : getCounters().getGroup(groupName))
{
writeAndLog(writer,String.format("%s=%d",counter.getName(),counter.getValue()));
}
}
JobID jobID = this.getJobID();
org.apache.hadoop.mapred.JobID oldJobId = new org.apache.hadoop.mapred.JobID(jobID.getJtIdentifier(),jobID.getId());
long minStart = Long.MAX_VALUE;
long maxFinish = 0;
long setupStart = Long.MAX_VALUE;
long cleanupFinish = 0;
DescriptiveStatistics mapStats = new DescriptiveStatistics();
DescriptiveStatistics reduceStats = new DescriptiveStatistics();
boolean success = true;
JobClient jobClient = new JobClient(this.conf);
Map<String,String> taskIdToType = new HashMap<String,String>();
TaskReport[] setupReports = jobClient.getSetupTaskReports(oldJobId);
if (setupReports.length > 0)
{
_log.info("Processing setup reports");
for (TaskReport report : jobClient.getSetupTaskReports(oldJobId))
{
taskIdToType.put(report.getTaskID().toString(),"SETUP");
if (report.getStartTime() == 0)
{
_log.warn("Skipping report with zero start time");
continue;
}
setupStart = Math.min(setupStart, report.getStartTime());
}
}
else
{
_log.error("No setup reports");
}
TaskReport[] mapReports = jobClient.getMapTaskReports(oldJobId);
if (mapReports.length > 0)
{
_log.info("Processing map reports");
for (TaskReport report : mapReports)
{
taskIdToType.put(report.getTaskID().toString(),"MAP");
if (report.getFinishTime() == 0 || report.getStartTime() == 0)
{
_log.warn("Skipping report with zero start or finish time");
continue;
}
minStart = Math.min(minStart, report.getStartTime());
mapStats.addValue(report.getFinishTime() - report.getStartTime());
}
}
else
{
_log.error("No map reports");
}
TaskReport[] reduceReports = jobClient.getReduceTaskReports(oldJobId);
if (reduceReports.length > 0)
{
_log.info("Processing reduce reports");
for (TaskReport report : reduceReports)
{
taskIdToType.put(report.getTaskID().toString(),"REDUCE");
if (report.getFinishTime() == 0 || report.getStartTime() == 0)
{
_log.warn("Skipping report with zero start or finish time");
continue;
}
maxFinish = Math.max(maxFinish, report.getFinishTime());
reduceStats.addValue(report.getFinishTime() - report.getStartTime());
}
}
else
{
_log.error("No reduce reports");
}
TaskReport[] cleanupReports = jobClient.getCleanupTaskReports(oldJobId);
if (cleanupReports.length > 0)
{
_log.info("Processing cleanup reports");
for (TaskReport report : cleanupReports)
{
taskIdToType.put(report.getTaskID().toString(),"CLEANUP");
if (report.getFinishTime() == 0)
{
_log.warn("Skipping report with finish time of zero");
continue;
}
cleanupFinish = Math.max(cleanupFinish, report.getFinishTime());
}
}
else
{
_log.error("No cleanup reports");
}
if (minStart == Long.MAX_VALUE)
{
_log.error("Could not determine map-reduce start time");
success = false;
}
if (maxFinish == 0)
{
_log.error("Could not determine map-reduce finish time");
success = false;
}
if (setupStart == Long.MAX_VALUE)
{
_log.error("Could not determine setup start time");
success = false;
}
if (cleanupFinish == 0)
{
_log.error("Could not determine cleanup finish time");
success = false;
}
// Collect statistics on successful/failed/killed task attempts, categorized by setup/map/reduce/cleanup.
// Unfortunately the job client doesn't have an easier way to get these statistics.
Map<String,Integer> attemptStats = new HashMap<String,Integer>();
_log.info("Processing task attempts");
for (TaskCompletionEvent event : getTaskCompletionEvents(jobClient,oldJobId))
{
String type = taskIdToType.get(event.getTaskAttemptId().getTaskID().toString());
String status = event.getTaskStatus().toString();
String key = String.format("%s_%s_ATTEMPTS",status,type);
if (!attemptStats.containsKey(key))
{
attemptStats.put(key, 0);
}
attemptStats.put(key, attemptStats.get(key) + 1);
}
if (success)
{
writeAndLog(writer,String.format("SETUP_START_TIME_MS=%d",setupStart));
writeAndLog(writer,String.format("CLEANUP_FINISH_TIME_MS=%d",cleanupFinish));
writeAndLog(writer,String.format("COMPLETE_WALL_CLOCK_TIME_MS=%d",cleanupFinish - setupStart));
writeAndLog(writer,String.format("MAP_REDUCE_START_TIME_MS=%d",minStart));
writeAndLog(writer,String.format("MAP_REDUCE_FINISH_TIME_MS=%d",maxFinish));
writeAndLog(writer,String.format("MAP_REDUCE_WALL_CLOCK_TIME_MS=%d",maxFinish - minStart));
writeAndLog(writer,String.format("MAP_TOTAL_TASKS=%d",(long)mapStats.getN()));
writeAndLog(writer,String.format("MAP_MAX_TIME_MS=%d",(long)mapStats.getMax()));
writeAndLog(writer,String.format("MAP_MIN_TIME_MS=%d",(long)mapStats.getMin()));
writeAndLog(writer,String.format("MAP_AVG_TIME_MS=%d",(long)mapStats.getMean()));
writeAndLog(writer,String.format("MAP_STD_TIME_MS=%d",(long)mapStats.getStandardDeviation()));
writeAndLog(writer,String.format("MAP_SUM_TIME_MS=%d",(long)mapStats.getSum()));
writeAndLog(writer,String.format("REDUCE_TOTAL_TASKS=%d",(long)reduceStats.getN()));
writeAndLog(writer,String.format("REDUCE_MAX_TIME_MS=%d",(long)reduceStats.getMax()));
writeAndLog(writer,String.format("REDUCE_MIN_TIME_MS=%d",(long)reduceStats.getMin()));
writeAndLog(writer,String.format("REDUCE_AVG_TIME_MS=%d",(long)reduceStats.getMean()));
writeAndLog(writer,String.format("REDUCE_STD_TIME_MS=%d",(long)reduceStats.getStandardDeviation()));
writeAndLog(writer,String.format("REDUCE_SUM_TIME_MS=%d",(long)reduceStats.getSum()));
writeAndLog(writer,String.format("MAP_REDUCE_SUM_TIME_MS=%d",(long)mapStats.getSum() + (long)reduceStats.getSum()));
for (Map.Entry<String, Integer> attemptStat : attemptStats.entrySet())
{
writeAndLog(writer,String.format("%s=%d",attemptStat.getKey(),attemptStat.getValue()));
}
}
writer.close();
buffer.close();
counterStream.close();
}