in flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java [659:897]
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Duration timeout) {
final JobID jobId = tdd.getJobId();
// todo: consider adding task info
try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();
final JobTable.Connection jobManagerConnection =
jobTable.getConnection(jobId)
.orElseThrow(
() -> {
final String message =
"Could not submit task because there is no JobManager "
+ "associated for the job "
+ jobId
+ '.';
log.debug(message);
return new TaskSubmissionException(message);
});
if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
final String message =
"Rejecting the task submission because the job manager leader id "
+ jobMasterId
+ " does not match the expected job manager leader id "
+ jobManagerConnection.getJobMasterId()
+ '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
final String message =
"No task slot allocated for job ID "
+ jobId
+ " and allocation ID "
+ tdd.getAllocationId()
+ '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
// re-integrate offloaded data and deserialize shuffle descriptors
try {
tdd.loadBigData(
taskExecutorBlobService.getPermanentBlobService(),
jobInformationCache,
taskInformationCache,
shuffleDescriptorsCache);
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException(
"Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
}
// deserialize the pre-serialized information
final JobInformation jobInformation;
final TaskInformation taskInformation;
try {
jobInformation = tdd.getJobInformation();
taskInformation = tdd.getTaskInformation();
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException(
"Could not deserialize the job or task information.", e);
}
if (!jobId.equals(jobInformation.getJobId())) {
throw new TaskSubmissionException(
"Inconsistent job ID information inside TaskDeploymentDescriptor ("
+ tdd.getJobId()
+ " vs. "
+ jobInformation.getJobId()
+ ")");
}
TaskManagerJobMetricGroup jobGroup =
taskManagerMetricGroup.addJob(
jobInformation.getJobId(), jobInformation.getJobName());
// note that a pre-existing job group can NOT be closed concurrently - this is done by
// the same TM thread in removeJobMetricsGroup
TaskMetricGroup taskMetricGroup =
jobGroup.addTask(tdd.getExecutionAttemptId(), taskInformation.getTaskName());
InputSplitProvider inputSplitProvider =
new RpcInputSplitProvider(
jobManagerConnection.getJobManagerGateway(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getRpcTimeout());
final TaskOperatorEventGateway taskOperatorEventGateway =
new RpcTaskOperatorEventGateway(
jobManagerConnection.getJobManagerGateway(),
executionAttemptID,
(t) ->
runAsync(
() ->
failTask(
jobInformation.getJobId(),
executionAttemptID,
t)));
TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
GlobalAggregateManager aggregateManager =
jobManagerConnection.getGlobalAggregateManager();
LibraryCacheManager.ClassLoaderHandle classLoaderHandle =
jobManagerConnection.getClassLoaderHandle();
PartitionProducerStateChecker partitionStateChecker =
jobManagerConnection.getPartitionStateChecker();
final TaskLocalStateStore localStateStore =
localStateStoresManager.localStateStoreForSubtask(
jobId,
tdd.getAllocationId(),
taskInformation.getJobVertexId(),
tdd.getSubtaskIndex(),
taskManagerConfiguration.getConfiguration(),
jobInformation.getJobConfiguration());
final FileMergingSnapshotManager fileMergingSnapshotManager =
fileMergingManager.fileMergingSnapshotManagerForTask(
jobId,
getResourceID(),
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getConfiguration(),
jobInformation.getJobConfiguration(),
jobGroup);
final FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManagerClosable =
fileMergingSnapshotManager == null
? null
: FileMergingSnapshotManagerClosableWrapper.of(
fileMergingSnapshotManager,
() ->
fileMergingManager.releaseMergingSnapshotManagerForTask(
jobId, tdd.getExecutionAttemptId()));
// TODO: Pass config value from user program and do overriding here.
final StateChangelogStorage<?> changelogStorage;
try {
changelogStorage =
changelogStoragesManager.stateChangelogStorageForJob(
jobId,
taskManagerConfiguration.getConfiguration(),
jobGroup,
localStateStore.getLocalRecoveryConfig());
} catch (IOException e) {
throw new TaskSubmissionException(e);
}
final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
final TaskStateManager taskStateManager =
new TaskStateManagerImpl(
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
fileMergingSnapshotManagerClosable,
changelogStorage,
changelogStoragesManager,
taskRestore,
checkpointResponder);
MemoryManager memoryManager;
try {
memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
} catch (SlotNotFoundException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
Task task =
new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
memoryManager,
sharedResources,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
externalResourceInfoProvider,
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
taskOperatorEventGateway,
aggregateManager,
classLoaderHandle,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
partitionStateChecker,
MdcUtils.scopeToJob(jobId, getRpcService().getScheduledExecutor()),
channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId));
taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, task::isBackPressured);
log.info(
"Received task {} ({}), deploy into slot with allocation id {}.",
task.getTaskInfo().getTaskNameWithSubtasks(),
tdd.getExecutionAttemptId(),
tdd.getAllocationId());
boolean taskAdded;
try {
taskAdded = taskSlotTable.addTask(task);
} catch (SlotNotFoundException | SlotNotActiveException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
if (taskAdded) {
task.startTaskThread();
setupResultPartitionBookkeeping(
tdd.getJobId(), tdd.getProducedPartitions(), task.getTerminationFuture());
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message =
"TaskManager already contains a task for id " + task.getExecutionId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
} catch (TaskSubmissionException e) {
return FutureUtils.completedExceptionally(e);
}
}