in flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java [574:886]
private void doRun() {
// ----------------------------
// Initial State transition
// ----------------------------
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.CREATED) {
if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
// success, we can start our work
break;
}
} else if (current == ExecutionState.FAILED) {
// we were immediately failed. tell the TaskManager that we reached our final state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
} else if (current == ExecutionState.CANCELING) {
if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
// we were immediately canceled. tell the TaskManager that we reached our final
// state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
} else {
if (metrics != null) {
metrics.close();
}
throw new IllegalStateException(
"Invalid state for beginning of operation of task " + this + '.');
}
}
// all resource acquisitions and registrations from here on
// need to be undone in the end
Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
TaskInvokable invokable = null;
try {
// ----------------------------
// Task Bootstrap - We periodically
// check for canceling as a shortcut
// ----------------------------
// activate safety net for task thread
LOG.debug("Creating FileSystem stream leak safety net for task {}", this);
FileSystemSafetyNet.initializeSafetyNetForThread();
// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
LOG.info("Loading JAR files for task {}.", this);
userCodeClassLoader = createUserCodeClassloader();
final ExecutionConfig executionConfig =
serializedExecutionConfig.deserializeValue(userCodeClassLoader.asClassLoader());
Configuration executionConfigConfiguration = executionConfig.toConfiguration();
// override task cancellation interval from Flink config if set in ExecutionConfig
taskCancellationInterval =
executionConfigConfiguration
.getOptional(TaskManagerOptions.TASK_CANCELLATION_INTERVAL)
.orElse(Duration.ofMillis(taskCancellationInterval))
.toMillis();
// override task cancellation timeout from Flink config if set in ExecutionConfig
taskCancellationTimeout =
executionConfigConfiguration
.getOptional(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT)
.orElse(Duration.ofMillis(taskCancellationTimeout))
.toMillis();
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// ----------------------------------------------------------------
// register the task with the network stack
// this operation may fail if the system does not have enough
// memory to run the necessary data exchanges
// the registration must also strictly be undone
// ----------------------------------------------------------------
LOG.debug("Registering task at network: {}.", this);
setupPartitionsAndGates(partitionWriters, inputGates);
for (ResultPartitionWriter partitionWriter : partitionWriters) {
taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());
}
// next, kick off the background copying of files for the distributed cache
try {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
DistributedCache.readFileInfoFromConfig(jobConfiguration)) {
LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
Future<Path> cp =
fileCache.createTmpFile(
entry.getKey(), entry.getValue(), jobId, executionId);
distributedCacheEntries.put(entry.getKey(), cp);
}
} catch (Exception e) {
throw new Exception(
String.format(
"Exception while adding files to distributed cache of task %s (%s).",
taskNameWithSubtask, executionId),
e);
}
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// ----------------------------------------------------------------
// call the user code initialization methods
// ----------------------------------------------------------------
TaskKvStateRegistry kvStateRegistry =
kvStateService.createKvStateTaskRegistry(jobId, getJobVertexId());
Environment env =
new RuntimeEnvironment(
jobId,
jobType,
vertexId,
executionId,
executionConfig,
jobInfo,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
sharedResources,
ioManager,
broadcastVariableManager,
taskStateManager,
aggregateManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
partitionWriters,
inputGates,
taskEventDispatcher,
checkpointResponder,
operatorCoordinatorEventGateway,
taskManagerConfig,
metrics,
this,
externalResourceInfoProvider,
channelStateExecutorFactory,
taskManagerActions);
// Make sure the user code classloader is accessible thread-locally.
// We are setting the correct context class loader before instantiating the invokable
// so that it is available to the invokable during its entire lifetime.
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
// When constructing invokable, separate threads can be constructed and thus should be
// monitored for system exit (in addition to invoking thread itself monitored below).
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
// now load and instantiate the task's invokable code
invokable =
loadAndInstantiateInvokable(
userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
restoreAndInvoke(invokable);
// make sure, we enter the catch block if the task leaves the invoke() method due
// to the fact that it has been canceled
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// ----------------------------------------------------------------
// finalization of a successful execution
// ----------------------------------------------------------------
// finish the produced partitions. if this fails, we consider the execution failed.
for (ResultPartitionWriter partitionWriter : partitionWriters) {
if (partitionWriter != null) {
partitionWriter.finish();
}
}
// try to mark the task as finished
// if that fails, the task was canceled/failed in the meantime
if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
throw new CancelTaskException();
}
} catch (Throwable t) {
// ----------------------------------------------------------------
// the execution failed. either the invokable code properly failed, or
// an exception was thrown as a side effect of cancelling
// ----------------------------------------------------------------
t = preProcessException(t);
try {
// transition into our final state. we should be either in DEPLOYING, INITIALIZING,
// RUNNING, CANCELING, or FAILED
// loop for multiple retries during concurrent state changes via calls to cancel()
// or to failExternally()
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.RUNNING
|| current == ExecutionState.INITIALIZING
|| current == ExecutionState.DEPLOYING) {
if (ExceptionUtils.findThrowable(t, CancelTaskException.class)
.isPresent()) {
if (transitionState(current, ExecutionState.CANCELED, t)) {
cancelInvokable(invokable);
break;
}
} else {
if (transitionState(current, ExecutionState.FAILED, t)) {
cancelInvokable(invokable);
break;
}
}
} else if (current == ExecutionState.CANCELING) {
if (transitionState(current, ExecutionState.CANCELED)) {
break;
}
} else if (current == ExecutionState.FAILED) {
// in state failed already, no transition necessary any more
break;
}
// unexpected state, go to failed
else if (transitionState(current, ExecutionState.FAILED, t)) {
LOG.error(
"Unexpected state in task {} ({}) during an exception: {}.",
taskNameWithSubtask,
executionId,
current);
break;
}
// else fall through the loop and
}
} catch (Throwable tt) {
String message =
String.format(
"FATAL - exception in exception handler of task %s (%s).",
taskNameWithSubtask, executionId);
LOG.error(message, tt);
notifyFatalError(message, tt);
}
} finally {
try {
LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId);
// clear the reference to the invokable. this helps guard against holding references
// to the invokable and its structures in cases where this Task object is still
// referenced
this.invokable = null;
// free the network resources
releaseResources();
// free memory resources
if (invokable != null) {
memoryManager.releaseAll(invokable);
}
// remove all of the tasks resources
fileCache.releaseJob(jobId, executionId);
// close and de-activate safety net for task thread
LOG.debug("Ensuring all FileSystem streams are closed for task {}", this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
notifyFinalState();
} catch (Throwable t) {
// an error in the resource cleanup is fatal
String message =
String.format(
"FATAL - exception in resource cleanup of task %s (%s).",
taskNameWithSubtask, executionId);
LOG.error(message, t);
notifyFatalError(message, t);
}
// un-register the metrics at the end so that the task may already be
// counted as finished when this happens
// errors here will only be logged
try {
metrics.close();
} catch (Throwable t) {
LOG.error(
"Error during metrics de-registration of task {} ({}).",
taskNameWithSubtask,
executionId,
t);
}
}
}