private List getTaskCompletionEvents()

in datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java [555:646]


  private List<TaskCompletionEvent> getTaskCompletionEvents(JobClient jobClient, org.apache.hadoop.mapred.JobID jobId) throws IOException
  {
    List<TaskCompletionEvent> events = new ArrayList<TaskCompletionEvent>();
    
    // Tries to use reflection to get access to the getTaskCompletionEvents method from the private jobSubmitClient field.
    // This method has a parameter for the size, which defaults to 10 for the top level methods and can therefore be extremely slow
    // if the goal is to get all events.
    
    Method getTaskCompletionEventsMethod = null;
    Object jobSubmitClient = null;
    
    try
    {
      Field f = JobClient.class.getDeclaredField("jobSubmitClient");
      f.setAccessible(true);
      jobSubmitClient = f.get(jobClient);       
      
      if (jobSubmitClient != null)
      { 
        getTaskCompletionEventsMethod = jobSubmitClient.getClass().getDeclaredMethod("getTaskCompletionEvents", org.apache.hadoop.mapred.JobID.class,int.class,int.class);
        getTaskCompletionEventsMethod.setAccessible(true);
      }
    }
    catch (NoSuchMethodException e)
    {
    }
    catch (SecurityException e)
    {
    }
    catch (NoSuchFieldException e)
    {
    }
    catch (IllegalArgumentException e)
    {
    }
    catch (IllegalAccessException e)
    {       
    }
    
    if (getTaskCompletionEventsMethod != null)
    {
      _log.info("Will call getTaskCompletionEvents via reflection since it's faster");
    }
    else
    {
      _log.info("Will call getTaskCompletionEvents via the slow method");
    }
    
    int index = 0;
    while(true)
    {
      TaskCompletionEvent[] currentEvents;
      if (getTaskCompletionEventsMethod != null)
      {
        try
        {
          // grab events, 250 at a time, which is faster than the other method which defaults to 10 at a time (with no override ability)
          currentEvents = (TaskCompletionEvent[])getTaskCompletionEventsMethod.invoke(jobSubmitClient, jobId, index, 250);
        }
        catch (IllegalArgumentException e)
        {
          _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
          getTaskCompletionEventsMethod = null;
          continue;
        }
        catch (IllegalAccessException e)
        {
          _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
          getTaskCompletionEventsMethod = null;
          continue;
        }
        catch (InvocationTargetException e)
        {
          _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
          getTaskCompletionEventsMethod = null;
          continue;
        }
      }
      else
      {
        currentEvents = this.getTaskCompletionEvents(index);
      }        
      if (currentEvents.length == 0) break;
      for (TaskCompletionEvent event : currentEvents)
      {
        events.add(event);
      }
      index += currentEvents.length;
    }
    
    return events;
  }