public void processJobExecuteRequest()

in modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java [1214:1463]


    public void processJobExecuteRequest(ClusterNode node, final GridJobExecuteRequest req) {
        if (log.isDebugEnabled())
            log.debug("Received job request message [req=" + req + ", nodeId=" + node.id() + ']');

        PartitionsReservation partsReservation = null;

        if (req.getCacheIds() != null) {
            assert req.getPartition() >= 0 : req;
            assert !F.isEmpty(req.getCacheIds()) : req;

            partsReservation = new PartitionsReservation(req.getCacheIds(), req.getPartition(), req.getTopVer());
        }

        GridJobWorker job = null;

        if (!rwLock.tryReadLock()) {
            if (log.isDebugEnabled())
                log.debug("Received job execution request while stopping this node (will ignore): " + req);

            return;
        }

        try {
            long endTime = req.getCreateTime() + req.getTimeout();

            // Account for overflow.
            if (endTime < 0)
                endTime = Long.MAX_VALUE;

            GridDeployment tmpDep = req.isForceLocalDeployment() ?
                ctx.deploy().getLocalDeployment(req.getTaskClassName()) :
                ctx.deploy().getGlobalDeployment(
                    req.getDeploymentMode(),
                    req.getTaskName(),
                    req.getTaskClassName(),
                    req.getUserVersion(),
                    node.id(),
                    req.getClassLoaderId(),
                    req.getLoaderParticipants(),
                    null);

            if (tmpDep == null) {
                if (log.isDebugEnabled())
                    log.debug("Checking local tasks...");

                // Check local tasks.
                for (Map.Entry<String, GridDeployment> d : ctx.task().getUsedDeploymentMap().entrySet()) {
                    if (d.getValue().classLoaderId().equals(req.getClassLoaderId())) {
                        assert d.getValue().local();

                        tmpDep = d.getValue();

                        break;
                    }
                }
            }

            final GridDeployment dep = tmpDep;

            if (log.isDebugEnabled())
                log.debug("Deployment: " + dep);

            boolean releaseDep = true;

            try {
                if (dep != null && dep.acquire()) {
                    GridJobSessionImpl jobSes;
                    GridJobContextImpl jobCtx;

                    try {
                        List<ComputeJobSibling> siblings = null;

                        if (!req.isDynamicSiblings()) {
                            Collection<ComputeJobSibling> siblings0 = req.getSiblings();

                            if (siblings0 == null) {
                                assert req.getSiblingsBytes() != null;

                                siblings0 = U.unmarshal(marsh, req.getSiblingsBytes(), U.resolveClassLoader(ctx.config()));
                            }

                            siblings = new ArrayList<>(siblings0);
                        }

                        Map<Object, Object> sesAttrs = null;

                        if (req.isSessionFullSupport()) {
                            sesAttrs = req.getSessionAttributes();

                            if (sesAttrs == null)
                                sesAttrs = U.unmarshal(marsh, req.getSessionAttributesBytes(),
                                    U.resolveClassLoader(dep.classLoader(), ctx.config()));
                        }

                        IgnitePredicate<ClusterNode> topPred = req.getTopologyPredicate();

                        if (topPred == null && req.getTopologyPredicateBytes() != null) {
                            topPred = U.unmarshal(marsh, req.getTopologyPredicateBytes(),
                                U.resolveClassLoader(dep.classLoader(), ctx.config()));
                        }

                        // Note that we unmarshal session/job attributes here with proper class loader.
                        GridTaskSessionImpl taskSes = ctx.session().createTaskSession(
                            req.getSessionId(),
                            node.id(),
                            req.getTaskName(),
                            dep,
                            req.getTaskClassName(),
                            req.topology(),
                            topPred,
                            req.getStartTaskTime(),
                            endTime,
                            siblings,
                            sesAttrs,
                            req.isSessionFullSupport(),
                            req.isInternal(),
                            req.executorName(),
                            ctx.security().securityContext()
                        );

                        taskSes.setCheckpointSpi(req.getCheckpointSpi());
                        taskSes.setClassLoader(dep.classLoader());

                        jobSes = new GridJobSessionImpl(ctx, taskSes, req.getJobId());

                        Map<? extends Serializable, ? extends Serializable> jobAttrs = req.getJobAttributes();

                        if (jobAttrs == null)
                            jobAttrs = U.unmarshal(marsh, req.getJobAttributesBytes(),
                                U.resolveClassLoader(dep.classLoader(), ctx.config()));

                        jobCtx = new GridJobContextImpl(ctx, req.getJobId(), jobAttrs);
                    }
                    catch (IgniteCheckedException e) {
                        IgniteException ex = new IgniteException("Failed to deserialize task attributes " +
                            "[taskName=" + req.getTaskName() + ", taskClsName=" + req.getTaskClassName() +
                            ", codeVer=" + req.getUserVersion() + ", taskClsLdr=" + dep.classLoader() + ']', e);

                        U.error(log, ex.getMessage(), e);

                        handleException(node, req, ex, endTime);

                        return;
                    }

                    job = new GridJobWorker(
                        ctx,
                        dep,
                        req.getCreateTime(),
                        jobSes,
                        jobCtx,
                        req.getJobBytes(),
                        req.getJob(),
                        node,
                        req.isInternal(),
                        evtLsnr,
                        holdLsnr,
                        partsReservation,
                        req.getTopVer(),
                        req.executorName(),
                        this::computeJobWorkerInterruptTimeout
                    );

                    jobCtx.job(job);

                    // If exception occurs on job initialization, deployment is released in job listener.
                    releaseDep = false;

                    if (job.initialize(dep, dep.deployedClass(req.getTaskClassName()).get1())) {
                        // Internal jobs will always be executed synchronously.
                        if (job.isInternal()) {
                            // This is an internal job and can be executed inside busy lock
                            // since job is expected to be short.
                            // This is essential for proper stop without races.
                            runSync(job);

                            // No execution outside lock.
                            job = null;
                        }
                        else if (jobAlwaysActivate) {
                            if (onBeforeActivateJob(job)) {
                                if (ctx.localNodeId().equals(node.id())) {
                                    // Always execute in another thread for local node.
                                    executeAsync(job);

                                    // No sync execution.
                                    job = null;
                                }
                                else {
                                    if (metricsUpdateFreq > -1L)
                                        // Job will be executed synchronously.
                                        startedJobsCnt.increment();

                                    startedJobsMetric.increment();
                                }

                            }
                            else
                                // Job has been cancelled.
                                // Set to null, to avoid sync execution.
                                job = null;
                        }
                        else {
                            GridJobWorker old = passiveJobs.putIfAbsent(job.getJobId(), job);

                            if (old == null) {
                                waitingJobsMetric.increment();

                                handleCollisions();
                            }
                            else
                                U.error(log, "Received computation request with duplicate job ID (could be " +
                                    "network malfunction, source node may hang if task timeout was not set) " +
                                    "[srcNode=" + node.id() +
                                    ", jobId=" + req.getJobId() + ", sesId=" + req.getSessionId() +
                                    ", locNodeId=" + ctx.localNodeId() + ']');

                            // No sync execution.
                            job = null;
                        }
                    }
                    else
                        // Job was not initialized, no execution.
                        job = null;
                }
                else {
                    // Deployment is null.
                    IgniteException ex = new IgniteDeploymentException("Task was not deployed or was redeployed since " +
                        "task execution [taskName=" + req.getTaskName() + ", taskClsName=" + req.getTaskClassName() +
                        ", codeVer=" + req.getUserVersion() + ", clsLdrId=" + req.getClassLoaderId() +
                        ", seqNum=" + req.getClassLoaderId().localId() + ", depMode=" + req.getDeploymentMode() +
                        ", dep=" + dep + ']');

                    U.error(log, ex.getMessage(), ex);

                    handleException(node, req, ex, endTime);
                }
            }
            finally {
                if (dep != null && releaseDep)
                    release(dep);
            }
        }
        finally {
            rwLock.readUnlock();
        }

        if (job != null)
            job.run();
    }