in datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java [555:646]
private List<TaskCompletionEvent> getTaskCompletionEvents(JobClient jobClient, org.apache.hadoop.mapred.JobID jobId) throws IOException
{
List<TaskCompletionEvent> events = new ArrayList<TaskCompletionEvent>();
// Tries to use reflection to get access to the getTaskCompletionEvents method from the private jobSubmitClient field.
// This method has a parameter for the size, which defaults to 10 for the top level methods and can therefore be extremely slow
// if the goal is to get all events.
Method getTaskCompletionEventsMethod = null;
Object jobSubmitClient = null;
try
{
Field f = JobClient.class.getDeclaredField("jobSubmitClient");
f.setAccessible(true);
jobSubmitClient = f.get(jobClient);
if (jobSubmitClient != null)
{
getTaskCompletionEventsMethod = jobSubmitClient.getClass().getDeclaredMethod("getTaskCompletionEvents", org.apache.hadoop.mapred.JobID.class,int.class,int.class);
getTaskCompletionEventsMethod.setAccessible(true);
}
}
catch (NoSuchMethodException e)
{
}
catch (SecurityException e)
{
}
catch (NoSuchFieldException e)
{
}
catch (IllegalArgumentException e)
{
}
catch (IllegalAccessException e)
{
}
if (getTaskCompletionEventsMethod != null)
{
_log.info("Will call getTaskCompletionEvents via reflection since it's faster");
}
else
{
_log.info("Will call getTaskCompletionEvents via the slow method");
}
int index = 0;
while(true)
{
TaskCompletionEvent[] currentEvents;
if (getTaskCompletionEventsMethod != null)
{
try
{
// grab events, 250 at a time, which is faster than the other method which defaults to 10 at a time (with no override ability)
currentEvents = (TaskCompletionEvent[])getTaskCompletionEventsMethod.invoke(jobSubmitClient, jobId, index, 250);
}
catch (IllegalArgumentException e)
{
_log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
getTaskCompletionEventsMethod = null;
continue;
}
catch (IllegalAccessException e)
{
_log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
getTaskCompletionEventsMethod = null;
continue;
}
catch (InvocationTargetException e)
{
_log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
getTaskCompletionEventsMethod = null;
continue;
}
}
else
{
currentEvents = this.getTaskCompletionEvents(index);
}
if (currentEvents.length == 0) break;
for (TaskCompletionEvent event : currentEvents)
{
events.add(event);
}
index += currentEvents.length;
}
return events;
}