public CompletableFuture submitTask()

in flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java [659:897]


    public CompletableFuture<Acknowledge> submitTask(
            TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Duration timeout) {

        final JobID jobId = tdd.getJobId();
        // todo: consider adding task info
        try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {

            final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();

            final JobTable.Connection jobManagerConnection =
                    jobTable.getConnection(jobId)
                            .orElseThrow(
                                    () -> {
                                        final String message =
                                                "Could not submit task because there is no JobManager "
                                                        + "associated for the job "
                                                        + jobId
                                                        + '.';

                                        log.debug(message);
                                        return new TaskSubmissionException(message);
                                    });

            if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
                final String message =
                        "Rejecting the task submission because the job manager leader id "
                                + jobMasterId
                                + " does not match the expected job manager leader id "
                                + jobManagerConnection.getJobMasterId()
                                + '.';

                log.debug(message);
                throw new TaskSubmissionException(message);
            }

            if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
                final String message =
                        "No task slot allocated for job ID "
                                + jobId
                                + " and allocation ID "
                                + tdd.getAllocationId()
                                + '.';
                log.debug(message);
                throw new TaskSubmissionException(message);
            }

            // re-integrate offloaded data and deserialize shuffle descriptors
            try {
                tdd.loadBigData(
                        taskExecutorBlobService.getPermanentBlobService(),
                        jobInformationCache,
                        taskInformationCache,
                        shuffleDescriptorsCache);
            } catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException(
                        "Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
            }

            // deserialize the pre-serialized information
            final JobInformation jobInformation;
            final TaskInformation taskInformation;
            try {
                jobInformation = tdd.getJobInformation();
                taskInformation = tdd.getTaskInformation();
            } catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException(
                        "Could not deserialize the job or task information.", e);
            }

            if (!jobId.equals(jobInformation.getJobId())) {
                throw new TaskSubmissionException(
                        "Inconsistent job ID information inside TaskDeploymentDescriptor ("
                                + tdd.getJobId()
                                + " vs. "
                                + jobInformation.getJobId()
                                + ")");
            }

            TaskManagerJobMetricGroup jobGroup =
                    taskManagerMetricGroup.addJob(
                            jobInformation.getJobId(), jobInformation.getJobName());

            // note that a pre-existing job group can NOT be closed concurrently - this is done by
            // the same TM thread in removeJobMetricsGroup
            TaskMetricGroup taskMetricGroup =
                    jobGroup.addTask(tdd.getExecutionAttemptId(), taskInformation.getTaskName());

            InputSplitProvider inputSplitProvider =
                    new RpcInputSplitProvider(
                            jobManagerConnection.getJobManagerGateway(),
                            taskInformation.getJobVertexId(),
                            tdd.getExecutionAttemptId(),
                            taskManagerConfiguration.getRpcTimeout());

            final TaskOperatorEventGateway taskOperatorEventGateway =
                    new RpcTaskOperatorEventGateway(
                            jobManagerConnection.getJobManagerGateway(),
                            executionAttemptID,
                            (t) ->
                                    runAsync(
                                            () ->
                                                    failTask(
                                                            jobInformation.getJobId(),
                                                            executionAttemptID,
                                                            t)));

            TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
            CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
            GlobalAggregateManager aggregateManager =
                    jobManagerConnection.getGlobalAggregateManager();

            LibraryCacheManager.ClassLoaderHandle classLoaderHandle =
                    jobManagerConnection.getClassLoaderHandle();
            PartitionProducerStateChecker partitionStateChecker =
                    jobManagerConnection.getPartitionStateChecker();

            final TaskLocalStateStore localStateStore =
                    localStateStoresManager.localStateStoreForSubtask(
                            jobId,
                            tdd.getAllocationId(),
                            taskInformation.getJobVertexId(),
                            tdd.getSubtaskIndex(),
                            taskManagerConfiguration.getConfiguration(),
                            jobInformation.getJobConfiguration());

            final FileMergingSnapshotManager fileMergingSnapshotManager =
                    fileMergingManager.fileMergingSnapshotManagerForTask(
                            jobId,
                            getResourceID(),
                            tdd.getExecutionAttemptId(),
                            taskManagerConfiguration.getConfiguration(),
                            jobInformation.getJobConfiguration(),
                            jobGroup);

            final FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManagerClosable =
                    fileMergingSnapshotManager == null
                            ? null
                            : FileMergingSnapshotManagerClosableWrapper.of(
                                    fileMergingSnapshotManager,
                                    () ->
                                            fileMergingManager.releaseMergingSnapshotManagerForTask(
                                                    jobId, tdd.getExecutionAttemptId()));

            // TODO: Pass config value from user program and do overriding here.
            final StateChangelogStorage<?> changelogStorage;
            try {
                changelogStorage =
                        changelogStoragesManager.stateChangelogStorageForJob(
                                jobId,
                                taskManagerConfiguration.getConfiguration(),
                                jobGroup,
                                localStateStore.getLocalRecoveryConfig());
            } catch (IOException e) {
                throw new TaskSubmissionException(e);
            }

            final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();

            final TaskStateManager taskStateManager =
                    new TaskStateManagerImpl(
                            jobId,
                            tdd.getExecutionAttemptId(),
                            localStateStore,
                            fileMergingSnapshotManagerClosable,
                            changelogStorage,
                            changelogStoragesManager,
                            taskRestore,
                            checkpointResponder);

            MemoryManager memoryManager;
            try {
                memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
            } catch (SlotNotFoundException e) {
                throw new TaskSubmissionException("Could not submit task.", e);
            }

            Task task =
                    new Task(
                            jobInformation,
                            taskInformation,
                            tdd.getExecutionAttemptId(),
                            tdd.getAllocationId(),
                            tdd.getProducedPartitions(),
                            tdd.getInputGates(),
                            memoryManager,
                            sharedResources,
                            taskExecutorServices.getIOManager(),
                            taskExecutorServices.getShuffleEnvironment(),
                            taskExecutorServices.getKvStateService(),
                            taskExecutorServices.getBroadcastVariableManager(),
                            taskExecutorServices.getTaskEventDispatcher(),
                            externalResourceInfoProvider,
                            taskStateManager,
                            taskManagerActions,
                            inputSplitProvider,
                            checkpointResponder,
                            taskOperatorEventGateway,
                            aggregateManager,
                            classLoaderHandle,
                            fileCache,
                            taskManagerConfiguration,
                            taskMetricGroup,
                            partitionStateChecker,
                            MdcUtils.scopeToJob(jobId, getRpcService().getScheduledExecutor()),
                            channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId));

            taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, task::isBackPressured);

            log.info(
                    "Received task {} ({}), deploy into slot with allocation id {}.",
                    task.getTaskInfo().getTaskNameWithSubtasks(),
                    tdd.getExecutionAttemptId(),
                    tdd.getAllocationId());

            boolean taskAdded;

            try {
                taskAdded = taskSlotTable.addTask(task);
            } catch (SlotNotFoundException | SlotNotActiveException e) {
                throw new TaskSubmissionException("Could not submit task.", e);
            }

            if (taskAdded) {
                task.startTaskThread();

                setupResultPartitionBookkeeping(
                        tdd.getJobId(), tdd.getProducedPartitions(), task.getTerminationFuture());
                return CompletableFuture.completedFuture(Acknowledge.get());
            } else {
                final String message =
                        "TaskManager already contains a task for id " + task.getExecutionId() + '.';

                log.debug(message);
                throw new TaskSubmissionException(message);
            }
        } catch (TaskSubmissionException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }