private ComputeTaskInternalFuture startTask()

in modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java [527:774]


    private <T, R> ComputeTaskInternalFuture<R> startTask(
        @Nullable String taskName,
        @Nullable Class<?> taskCls,
        @Nullable ComputeTask<T, R> task,
        IgniteUuid sesId,
        @Nullable T arg,
        TaskExecutionOptions opts
    ) {
        assert sesId != null;

        authorizeUserTask(taskName, taskCls, task, opts);

        assert opts.timeout() >= 0;

        long timeout0 = opts.timeout() == 0 ? Long.MAX_VALUE : opts.timeout();

        long startTime = U.currentTimeMillis();

        long endTime = timeout0 + startTime;

        // Account for overflow.
        if (endTime < 0)
            endTime = Long.MAX_VALUE;

        IgniteCheckedException deployEx = null;
        GridDeployment dep = null;

        // User provided task name.
        if (taskName != null) {
            assert taskCls == null;
            assert task == null;

            try {
                dep = ctx.deploy().getDeployment(taskName);

                if (dep == null)
                    throw new IgniteDeploymentCheckedException("Unknown task name or failed to auto-deploy " +
                        "task (was task (re|un)deployed?): " + taskName);

                IgniteBiTuple<Class<?>, Throwable> cls = dep.deployedClass(taskName);

                if (cls.get1() == null)
                    throw new IgniteDeploymentCheckedException("Unknown task name or failed to auto-deploy " +
                        "task (was task (re|un)deployed?) [taskName=" + taskName + ", dep=" + dep + ']', cls.get2());

                taskCls = cls.get1();

                if (!ComputeTask.class.isAssignableFrom(taskCls))
                    throw new IgniteCheckedException("Failed to auto-deploy task (deployed class is not a task) " +
                        "[taskName=" +
                        taskName + ", depCls=" + taskCls + ']');
            }
            catch (IgniteCheckedException e) {
                deployEx = e;
            }
        }
        // Deploy user task class.
        else if (taskCls != null) {
            assert task == null;

            try {
                // Implicit deploy.
                dep = ctx.deploy().deploy(taskCls, U.detectClassLoader(taskCls));

                if (dep == null)
                    throw new IgniteDeploymentCheckedException("Failed to auto-deploy task " +
                        "(was task (re|un)deployed?): " + taskCls);

                taskName = taskName(dep, taskCls, opts);
            }
            catch (IgniteCheckedException e) {
                taskName = taskCls.getName();

                deployEx = e;
            }
        }
        // Deploy user task.
        else if (task != null) {
            try {
                ClassLoader ldr;

                Class<?> cls;

                if (task instanceof GridPeerDeployAware) {
                    GridPeerDeployAware depAware = (GridPeerDeployAware)task;

                    cls = depAware.deployClass();
                    ldr = depAware.classLoader();

                    // Set proper class name to make peer-loading possible.
                    taskCls = cls;
                }
                else {
                    taskCls = task.getClass();

                    assert ComputeTask.class.isAssignableFrom(taskCls);

                    cls = task.getClass();
                    ldr = U.detectClassLoader(cls);
                }

                // Explicit deploy.
                dep = ctx.deploy().deploy(cls, ldr);

                if (dep == null)
                    throw new IgniteDeploymentCheckedException("Failed to auto-deploy task " +
                        "(was task (re|un)deployed?): " + cls);

                taskName = taskName(dep, taskCls, opts);
            }
            catch (IgniteCheckedException e) {
                taskName = task.getClass().getName();

                deployEx = e;
            }
        }

        assert taskName != null;

        if (log.isDebugEnabled())
            log.debug("Task deployment: " + dep);

        boolean fullSup = (dep != null && taskCls != null &&
            dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) != null) ||
            (task instanceof PlatformFullTask && ((PlatformFullTask)task).taskSessionFullSupport());

        Collection<UUID> top = null;

        final IgnitePredicate<ClusterNode> topPred = opts.projectionPredicate();

        if (topPred == null) {
            final Collection<ClusterNode> nodes = opts.projection();

            top = nodes != null ? nodeIds(nodes) : null;
        }

        boolean internal = false;

        if (dep == null || taskCls == null)
            assert deployEx != null;
        else
            internal = dep.internalTask(task, taskCls);

        // Creates task session with task name and task version.
        GridTaskSessionImpl ses = ctx.session().createTaskSession(
            sesId,
            ctx.localNodeId(),
            taskName,
            dep,
            taskCls == null ? null : taskCls.getName(),
            top,
            topPred,
            startTime,
            endTime,
            Collections.emptyList(),
            emptyMap(),
            fullSup,
            internal,
            opts.executor(),
            ctx.security().securityContext()
        );

        ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx);

        IgniteCheckedException securityEx = null;

        if (ctx.security().enabled() && deployEx == null && !dep.internalTask(task, taskCls)) {
            try {
                saveTaskMetadata(taskName);
            }
            catch (IgniteCheckedException e) {
                securityEx = e;
            }
        }

        if (deployEx == null && securityEx == null) {
            if (dep == null || !dep.acquire())
                handleException(new IgniteDeploymentCheckedException("Task not deployed: " + ses.getTaskName()), fut);
            else {
                GridTaskWorker<?, ?> taskWorker = new GridTaskWorker<>(
                    ctx,
                    arg,
                    ses,
                    fut,
                    taskCls,
                    task,
                    dep,
                    new TaskEventListener(),
                    opts,
                    securitySubjectId(ctx));

                GridTaskWorker<?, ?> taskWorker0 = tasks.putIfAbsent(sesId, taskWorker);

                assert taskWorker0 == null : "Session ID is not unique: " + sesId;

                if (ctx.event().isRecordable(EVT_MANAGEMENT_TASK_STARTED) && dep.visorManagementTask(task, taskCls)) {
                    VisorTaskArgument visorTaskArg = (VisorTaskArgument)arg;

                    Event evt = new ManagementTaskEvent(
                        ctx.discovery().localNode(),
                        visorTaskArg != null && visorTaskArg.getArgument() != null
                            ? visorTaskArg.getArgument().toString() : "[]",
                        EVT_MANAGEMENT_TASK_STARTED,
                        ses.getId(),
                        taskName,
                        taskCls == null ? null : taskCls.getName(),
                        false,
                        securitySubjectId(ctx),
                        visorTaskArg
                    );

                    ctx.event().record(evt);
                }

                if (!ctx.clientDisconnected()) {
                    if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
                        try {
                            // Start task execution in another thread.
                            if (opts.isSystemTask())
                                ctx.pools().getSystemExecutorService().execute(taskWorker);
                            else
                                ctx.pools().getExecutorService().execute(taskWorker);
                        }
                        catch (RejectedExecutionException e) {
                            tasks.remove(sesId);

                            release(dep);

                            handleException(new ComputeExecutionRejectedException("Failed to execute task " +
                                "due to thread pool execution rejection: " + taskName, e), fut);
                        }
                    }
                    else
                        taskWorker.run();
                }
                else
                    taskWorker.finishTask(null, disconnectedError(null));
            }
        }
        else {
            if (deployEx != null)
                handleException(deployEx, fut);
            else
                handleException(securityEx, fut);
        }

        return fut;
    }