void onResponse()

in modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java [723:1024]


    void onResponse(GridJobExecuteResponse msg) {
        assert msg != null;

        if (fut.isDone()) {
            if (log.isDebugEnabled())
                log.debug("Ignoring job response since task has finished: " + msg);

            return;
        }

        GridJobExecuteResponse res = msg;

        while (res != null) {
            GridJobResultImpl jobRes = null;

            // Flag indicating whether occupied flag for
            // job response was changed in this method apply.
            boolean selfOccupied = false;

            IgniteInternalFuture<?> affFut = null;

            boolean waitForAffTop = false;

            final GridJobExecuteResponse failoverRes = res;

            try {
                synchronized (mux) {
                    // If task is not waiting for responses,
                    // then there is no point to proceed.
                    if (state != State.WAITING) {
                        if (log.isDebugEnabled())
                            log.debug("Ignoring response since task is already reducing or finishing [res=" + res +
                                ", job=" + ses + ", state=" + state + ']');

                        return;
                    }

                    jobRes = this.jobRes.get(res.getJobId());

                    if (jobRes == null) {
                        if (log.isDebugEnabled())
                            U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res);

                        res = delayedRess.poll();

                        // We can not return here because there can be more delayed messages in the queue.
                        continue;
                    }

                    // Only process 1st response and ignore following ones. This scenario
                    // is possible if node has left topology and and fake failure response
                    // was created from discovery listener and when sending request failed.
                    if (jobRes.hasResponse()) {
                        if (log.isDebugEnabled())
                            log.debug("Received redundant response for a job (will ignore): " + res);

                        res = delayedRess.poll();

                        // We can not return here because there can be more delayed messages in the queue.
                        continue;
                    }

                    if (!jobRes.getNode().id().equals(res.getNodeId())) {
                        if (log.isDebugEnabled())
                            log.debug("Ignoring stale response as job was already resent to other node [res=" + res +
                                ", jobRes=" + jobRes + ']');

                        // Prevent processing 2 responses for the same job simultaneously.
                        jobRes.setOccupied(true);

                        selfOccupied = true;

                        // We can not return here because there can be more delayed messages in the queue.
                        continue;
                    }

                    if (jobRes.isOccupied()) {
                        if (log.isDebugEnabled())
                            log.debug("Adding response to delayed queue (job is either being sent or processing " +
                                "another response): " + res);

                        delayedRess.offer(res);

                        return;
                    }

                    if (lockRespProc) {
                        delayedRess.offer(res);

                        return;
                    }

                    lockRespProc = true;

                    selfOccupied = true;

                    // Prevent processing 2 responses for the same job simultaneously.
                    jobRes.setOccupied(true);

                    // We don't keep reference to job if results are not cached.
                    if (!resCache)
                        this.jobRes.remove(res.getJobId());
                }

                if (res.getFakeException() != null)
                    jobRes.onResponse(null, res.getFakeException(), null, false);
                else {
                    ClassLoader clsLdr = dep.classLoader();

                    try {
                        boolean loc = ctx.localNodeId().equals(res.getNodeId()) && !ctx.config().isMarshalLocalJobs();

                        Object res0 = loc ? res.getJobResult() : U.unmarshal(marsh, res.getJobResultBytes(),
                            U.resolveClassLoader(clsLdr, ctx.config()));

                        IgniteException ex = loc ? res.getException() :
                            U.<IgniteException>unmarshal(marsh, res.getExceptionBytes(),
                                U.resolveClassLoader(clsLdr, ctx.config()));

                        Map<Object, Object> attrs = loc ? res.getJobAttributes() :
                            U.<Map<Object, Object>>unmarshal(marsh, res.getJobAttributesBytes(),
                                U.resolveClassLoader(clsLdr, ctx.config()));

                        jobRes.onResponse(res0, ex, attrs, res.isCancelled());

                        if (loc)
                            ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobAfterSend.class);
                    }
                    catch (IgniteCheckedException e) {
                        if (log.isDebugEnabled())
                            U.error(log, "Error deserializing job response: " + res, e);

                        finishTask(null, e);
                    }
                }

                List<ComputeJobResult> results;

                if (!resCache)
                    results = emptyList();
                else {
                    synchronized (mux) {
                        results = getRemoteResults();
                    }
                }

                ComputeJobResultPolicy plc = result(jobRes, results);

                if (plc == null) {
                    String errMsg = "Failed to obtain remote job result policy for result from ComputeTask.result(..) " +
                        "method that returned null (will fail the whole task): " + jobRes;

                    finishTask(null, new IgniteCheckedException(errMsg));

                    return;
                }

                boolean retry = false;
                synchronized (mux) {
                    // If task is not waiting for responses,
                    // then there is no point to proceed.
                    if (state != State.WAITING) {
                        if (log.isDebugEnabled())
                            log.debug("Ignoring ComputeTask.result(..) value since task is already reducing or" +
                                "finishing [res=" + res + ", job=" + ses + ", state=" + state + ']');

                        return;
                    }

                    if (res.retry()) {
                        // Retry is used only with affinity call / run.
                        assert affCacheIds != null;
                        retry = true;

                        mapTopVer = U.max(res.getRetryTopologyVersion(), ctx.cache().context().exchange().readyAffinityVersion());
                        affFut = ctx.cache().context().exchange().lastTopologyFuture();

                        if (affFut != null && !affFut.isDone()) {
                            waitForAffTop = true;

                            jobRes.resetResponse();
                        }
                    }
                    else {
                        switch (plc) {
                            // Start reducing all results received so far.
                            case REDUCE: {
                                state = State.REDUCING;

                                break;
                            }

                            // Keep waiting if there are more responses to come,
                            // otherwise, reduce.
                            case WAIT: {
                                assert results.size() <= this.jobRes.size();

                                // If there are more results to wait for.
                                // If result cache is disabled, then we reduce
                                // when both collections are empty.
                                if (results.size() == this.jobRes.size()) {
                                    plc = ComputeJobResultPolicy.REDUCE;

                                    // All results are received, proceed to reduce method.
                                    state = State.REDUCING;
                                }

                                break;
                            }

                            case FAILOVER: {
                                if (affCacheIds != null) {
                                    mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();

                                    affFut = ctx.cache().context().exchange().lastTopologyFuture();

                                    if (affFut == null || affFut.isDone()) {
                                        affFut = null;

                                        // Need asynchronosly fetch affinity if cache is not started on node .
                                        if (affCacheName != null && ctx.cache().internalCache(affCacheName) == null) {
                                            affFut = ctx.affinity().affinityCacheFuture(affCacheName, mapTopVer);

                                            if (affFut.isDone())
                                                affFut = null;
                                        }
                                    }
                                }

                                if (affFut != null) {
                                    waitForAffTop = true;

                                    jobRes.resetResponse();
                                }
                                else if (!failover(res, jobRes, getTaskTopology()))
                                    plc = null;

                                break;
                            }
                        }
                    }
                }

                // Outside of synchronization.
                if (retry && !waitForAffTop) {
                    // Handle retry
                    retryAttemptCnt++;

                    final long wait = retryAttemptCnt * RETRY_DELAY_MS;
                    sendRetryRequest(wait, jobRes, res);
                }
                else if (plc != null && !waitForAffTop && !retry) {
                    // Handle failover.
                    if (plc == FAILOVER)
                        sendFailoverRequest(jobRes);
                    else {
                        evtLsnr.onJobFinished(this, jobRes.getSibling());

                        if (plc == ComputeJobResultPolicy.REDUCE)
                            reduce(results);
                    }
                }
            }
            catch (IgniteCheckedException e) {
                if (log.isDebugEnabled())
                    U.error(log, "Failed to obtain topology [ses=" + ses + ", err=" + e + ']', e);

                finishTask(null, e);

                waitForAffTop = false;
            }
            finally {
                // Open up job for processing responses.
                // Only unset occupied flag, if it was
                // set in this method.
                if (selfOccupied) {
                    assert jobRes != null;

                    synchronized (mux) {
                        jobRes.setOccupied(false);

                        lockRespProc = false;
                    }

                    // Process delayed responses if there are any.
                    res = delayedRess.poll();
                }
            }

            if (waitForAffTop && affFut != null) {
                affFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
                    @Override public void apply(IgniteInternalFuture<?> fut0) {
                        ctx.closure().runLocalSafe(new GridPlainRunnable() {
                            @Override public void run() {
                                onResponse(failoverRes);
                            }
                        }, false);
                    }
                });
            }
        }
    }