in tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java [170:251]
public Void call() throws Exception {
try {
return ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try {
LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
task.initialize();
if (!Thread.currentThread().isInterrupted() && firstException == null) {
LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
task.run();
LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
task.close();
task.setFrameworkCounters();
}
LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID()
+ ", fatalErrorOccurred=" + (firstException != null));
if (firstException == null) {
try {
taskReporter.taskSucceeded(task.getTaskAttemptID());
} catch (IOException e) {
LOG.warn("Heartbeat failure caused by communication failure", e);
maybeRegisterFirstException(e);
// Falling off, since the runner thread checks for the registered exception.
} catch (TezException e) {
LOG.warn("Heartbeat failure reported by AM", e);
maybeRegisterFirstException(e);
// Falling off, since the runner thread checks for the registered exception.
}
}
return null;
} catch (Throwable cause) {
if (cause instanceof FSError) {
// Not immediately fatal, this is an error reported by Hadoop FileSystem
maybeRegisterFirstException(cause);
LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
cause);
try {
sendFailure(cause, "FS Error in Child JVM");
} catch (Exception ignored) {
// Ignored since another cause is already known
LOG.info(
"Ignoring the following exception since a previous exception is already registered",
ignored);
}
throw (FSError) cause;
} else if (cause instanceof Error) {
LOG.error("Exception of type Error. Exiting now", cause);
ExitUtil.terminate(-1, cause);
} else {
if (cause instanceof UndeclaredThrowableException) {
cause = ((UndeclaredThrowableException) cause).getCause();
}
maybeRegisterFirstException(cause);
LOG.info("Encounted an error while executing task: " + task.getTaskAttemptID(),
cause);
try {
sendFailure(cause, "Failure while running task");
} catch (Exception ignored) {
// Ignored since another cause is already known
LOG.info(
"Ignoring the following exception since a previous exception is already registered",
ignored);
}
if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof TezException) {
throw (TezException) cause;
} else {
throw new TezException(cause);
}
}
} finally {
task.cleanup();
}
return null;
}
});
} finally {
taskRunning.set(false);
}
}