in tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java [215:333]
public ContainerExecutionResult run() throws IOException, InterruptedException, TezException {
ContainerContext containerContext = new ContainerContext(containerIdString);
ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext,
getTaskMaxSleepTime);
taskReporter = new TaskReporter(umbilical, amHeartbeatInterval,
sendCounterInterval, maxEventsToGet, heartbeatCounter, containerIdString);
UserGroupInformation childUGI = null;
while (!executor.isTerminated() && !isShutdown.get()) {
if (taskCount > 0) {
TezUtilsInternal.updateLoggers(defaultConf, "", LoggingUtils.getPatternForTask(defaultConf));
}
ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
boolean error = false;
ContainerTask containerTask = null;
try {
containerTask = getTaskFuture.get();
} catch (ExecutionException e) {
error = true;
Throwable cause = e.getCause();
LOG.error("Error fetching new work for container {}", containerIdString,
cause);
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
cause, "Execution Exception while fetching new work: " + e.getMessage());
} catch (InterruptedException e) {
error = true;
LOG.info("Interrupted while waiting for new work for container {}", containerIdString);
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
"Interrupted while waiting for new work");
} finally {
if (error) {
shutdown();
}
}
TezCommonUtils.logCredentials(LOG, containerTask.getCredentials(), "containerTask");
if (containerTask.shouldDie()) {
LOG.info("ContainerTask returned shouldDie=true for container {}, Exiting", containerIdString);
shutdown();
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
"Asked to die by the AM");
} else {
TezTaskAttemptID attemptId = containerTask.getTaskSpec().getTaskAttemptID();
Configuration taskConf;
if (containerTask.getTaskSpec().getTaskConf() != null) {
Configuration copy = new Configuration(defaultConf);
TezTaskRunner2.mergeTaskSpecConfToConf(containerTask.getTaskSpec(), copy);
taskConf = copy;
LoggingUtils.initLoggingContext(mdcContext, copy, attemptId.getTaskID().getVertexID().getDAGID().toString(),
attemptId.toString());
} else {
taskConf = defaultConf;
LoggingUtils.initLoggingContext(mdcContext, defaultConf,
attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString());
}
String loggerAddend = attemptId.toString();
taskCount++;
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
System.err.println(timeStamp + " Starting to run new task attempt: " +
containerTask.getTaskSpec().getTaskAttemptID().toString());
System.out.println(timeStamp + " Starting to run new task attempt: " +
containerTask.getTaskSpec().getTaskAttemptID().toString());
TezUtilsInternal.setHadoopCallerContext(hadoopShim,
containerTask.getTaskSpec().getTaskAttemptID());
TezUtilsInternal.updateLoggers(defaultConf, loggerAddend, LoggingUtils.getPatternForTask(defaultConf));
FileSystem.clearStatistics();
childUGI = handleNewTaskCredentials(containerTask, childUGI);
TezCommonUtils.logCredentials(LOG, childUGI.getCredentials(), "taskChildUGI");
handleNewTaskLocalResources(containerTask, childUGI);
cleanupOnTaskChanged(containerTask);
// Execute the Actual Task
TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI,
localDirs, containerTask.getTaskSpec(), appAttemptNumber,
serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters,
hadoopShim, sharedExecutor);
boolean shouldDie;
final String[] hookClasses = taskConf
.getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new String[0]);
final TezTaskAttemptHook[] hooks = new TezTaskAttemptHook[hookClasses.length];
for (int i = 0; i < hooks.length; i++) {
hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
hooks[i].start(attemptId, taskConf);
}
try {
TaskRunner2Result result = taskRunner.run();
LOG.info("TaskRunner2Result: {}", result);
shouldDie = result.isContainerShutdownRequested();
if (shouldDie) {
LOG.info("Got a shouldDie notification via heartbeats for container {}. Shutting down", containerIdString);
shutdown();
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
"Asked to die by the AM");
}
if (result.getError() != null) {
Throwable e = result.getError();
handleError(result.getError());
return new ContainerExecutionResult(
ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
e, "TaskExecutionFailure: " + e.getMessage());
}
} finally {
for (TezTaskAttemptHook hook : hooks) {
hook.stop();
}
FileSystem.closeAllForUGI(childUGI);
}
}
}
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
null);
}