in tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java [176:297]
public TaskRunner2Result run() {
try {
Future<TaskRunner2CallableResult> future = null;
synchronized (this) {
// All running state changes must be made within a synchronized block to ensure
// kills are issued or the task is not setup.
if (isRunningState()) {
// Safe to do this within a synchronized block because we're providing
// the handler on which the Reporter will communicate back. Assuming
// the register call doesn't end up hanging.
taskRunnerCallable = new TaskRunner2Callable(task, ugi,
umbilicalAndErrorHandler);
taskReporter.registerTask(task, umbilicalAndErrorHandler);
future = executor.submit(taskRunnerCallable);
}
}
if (future == null) {
return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
}
TaskRunner2CallableResult executionResult = null;
// The task started. Wait for it to complete.
try {
executionResult = future.get();
} catch (Throwable e) {
if (e instanceof ExecutionException) {
e = e.getCause();
}
synchronized (this) {
if (isRunningState()) {
trySettingEndReason(EndReason.TASK_ERROR);
registerFirstException(getTaskFailureType(e), e, null);
LOG.warn("Exception from RunnerCallable", e);
}
}
}
processCallableResult(executionResult);
switch (firstEndReason) {
case SUCCESS:
try {
taskReporter.taskSucceeded(task.getTaskAttemptID());
return logAndReturnEndResult(EndReason.SUCCESS, null, null, stopContainerRequested.get());
} catch (IOException e) {
// Comm failure. Task can't do much.
handleFinalStatusUpdateFailure(e, "success");
return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, firstTaskFailureType, e, stopContainerRequested.get());
} catch (TezException e) {
// Failure from AM. Task can't do much.
handleFinalStatusUpdateFailure(e, "success");
return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, firstTaskFailureType, e, stopContainerRequested.get());
}
case CONTAINER_STOP_REQUESTED:
// Don't need to send any more communication updates to the AM.
return logAndReturnEndResult(firstEndReason, firstTaskFailureType, null, stopContainerRequested.get());
case KILL_REQUESTED:
// This was an external kill called directly on the task runner
return logAndReturnEndResult(firstEndReason, firstTaskFailureType, null, stopContainerRequested.get());
case TASK_KILL_REQUEST:
// Task reported a self kill
return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
case COMMUNICATION_FAILURE:
// Already seen a communication failure. There's no point trying to report another one.
return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
case TASK_ERROR:
// Don't report an error again if it was reported via signalFatalError
if (errorReporterToAm.get()) {
return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
} else {
String message;
if (firstException instanceof FSError) {
message = "Encountered an FSError while executing task: " + task.getTaskAttemptID();
} else if (firstException instanceof Error) {
message = "Encountered an Error while executing task: " + task.getTaskAttemptID();
} else {
message = "Error while running task ( failure ) : " + task.getTaskAttemptID();
}
try {
taskReporter.taskFailed(task.getTaskAttemptID(), firstTaskFailureType, firstException, message, exceptionSourceInfo);
return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
} catch (IOException e) {
// Comm failure. Task can't do much.
handleFinalStatusUpdateFailure(e, "failure");
return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
} catch (TezException e) {
// Failure from AM. Task can't do much.
handleFinalStatusUpdateFailure(e, "failure");
return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get());
}
}
default:
LOG.error("Unexpected EndReason. File a bug");
return logAndReturnEndResult(EndReason.TASK_ERROR, firstTaskFailureType, new RuntimeException("Unexpected EndReason"), stopContainerRequested.get());
}
} finally {
// Clear the interrupted status of the blocking thread, in case it is set after the
// InterruptedException was invoked.
oobSignalLock.lock();
try {
while (oobSignalErrorInProgress) {
try {
oobSignalCondition.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for OOB fatal error to complete");
Thread.currentThread().interrupt();
}
}
} finally {
oobSignalLock.unlock();
}
taskReporter.unregisterTask(task.getTaskAttemptID());
if (taskKillStartTime != 0) {
LOG.info("Time taken to interrupt task={}", (System.currentTimeMillis() - taskKillStartTime));
}
if (localExecutor != null) {
localExecutor.shutdown();
}
Thread.interrupted();
}
}