void finishJob()

in modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java [859:1084]


    void finishJob(
        @Nullable Object res,
        @Nullable IgniteException ex,
        boolean sndReply,
        boolean retry
    ) {
        // Avoid finishing a job more than once from different threads.
        if (!finishing.compareAndSet(false, true))
            return;

        // Do not send reply if job has been cancelled from system.
        if (sndReply)
            sndReply = !sysCancelled;

        // We should save message ID here since listener callback will reset sequence.
        ClusterNode sndNode = ctx.discovery().node(taskNode.id());

        finishTime = U.currentTimeMillis();

        Collection<IgniteBiTuple<Integer, String>> evts = null;

        try {
            if (ses.isFullSupport())
                evtLsnr.onBeforeJobResponseSent(this);

            // Send response back only if job has not timed out.
            if (!isTimedOut()) {
                if (sndReply) {
                    if (sndNode == null) {
                        onMasterNodeLeft();

                        U.warn(log, "Failed to reply to sender node because it left grid [nodeId=" + taskNode.id() +
                            ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + ']');

                        status = FAILED;

                        // Record job reply failure.
                        if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
                            evts = addEvent(evts, EVT_JOB_FAILED, "Job reply failed (task node left grid): " + job);
                    }
                    else {
                        try {
                            byte[] resBytes = null;
                            byte[] exBytes = null;
                            byte[] attrBytes = null;

                            boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs();

                            Map<Object, Object> attrs = jobCtx.getAttributes();

                            // Try to serialize response, and if exception - return to client.
                            if (!loc) {
                                try {
                                    resBytes = U.marshal(marsh, res);
                                }
                                catch (IgniteCheckedException e) {
                                    resBytes = U.marshal(marsh, null);

                                    if (ex != null)
                                        ex.addSuppressed(e);
                                    else
                                        ex = U.convertException(e);

                                    logError("Failed to serialize job response [nodeId=" + taskNode.id() +
                                        ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
                                        ", resCls=" + (res == null ? null : res.getClass()) + ']', e);
                                }

                                try {
                                    attrBytes = U.marshal(marsh, attrs);
                                }
                                catch (IgniteCheckedException e) {
                                    attrBytes = U.marshal(marsh, Collections.emptyMap());

                                    if (ex != null)
                                        ex.addSuppressed(e);
                                    else
                                        ex = U.convertException(e);

                                    logError("Failed to serialize job attributes [nodeId=" + taskNode.id() +
                                        ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
                                        ", attrs=" + attrs + ']', e);
                                }

                                try {
                                    exBytes = U.marshal(marsh, ex);
                                }
                                catch (IgniteCheckedException e) {
                                    String msg = "Failed to serialize job exception [nodeId=" + taskNode.id() +
                                        ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
                                        ", msg=\"" + e.getMessage() + "\"]";

                                    ex = new IgniteException(msg);

                                    logError(msg, e);

                                    exBytes = U.marshal(marsh, ex);
                                }
                            }

                            if (ex != null) {
                                status = FAILED;

                                if (isStarted) {
                                    // Job failed.
                                    if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
                                        evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" +
                                            ex + ", job=" + job + ']');
                                }
                                else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED))
                                    evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started " +
                                        "[ex=" + ex + ", job=" + job + ']');
                            }
                            else {
                                status = FINISHED;

                                if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
                                    evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
                            }

                            GridJobExecuteResponse jobRes = new GridJobExecuteResponse(
                                ctx.localNodeId(),
                                ses.getId(),
                                ses.getJobId(),
                                exBytes,
                                loc ? ex : null,
                                resBytes,
                                loc ? res : null,
                                attrBytes,
                                loc ? attrs : null,
                                isCancelled(),
                                retry ? ctx.cache().context().exchange().readyAffinityVersion() : null);

                            long timeout = ses.getEndTime() - U.currentTimeMillis();

                            if (timeout <= 0)
                                // Ignore the actual timeout and send response anyway.
                                timeout = 1;

                            if (ses.isFullSupport()) {
                                // Send response to designated job topic.
                                // Always go through communication to preserve order,
                                // if attributes are enabled.
                                ctx.io().sendOrderedMessage(
                                    sndNode,
                                    taskTopic,
                                    jobRes,
                                    internal ? MANAGEMENT_POOL : SYSTEM_POOL,
                                    timeout,
                                    false);
                            }
                            else if (ctx.localNodeId().equals(sndNode.id()))
                                ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
                            else
                                // Send response to common topic as unordered message.
                                ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
                        }
                        catch (IgniteCheckedException e) {
                            // Log and invoke the master-leave callback.
                            if ((e instanceof ClusterTopologyCheckedException) || isDeadNode(taskNode.id())) {
                                onMasterNodeLeft();

                                // Avoid stack trace for left nodes.
                                U.warn(log, "Failed to reply to sender node because it left grid " +
                                    "[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() +
                                    ", ses=" + ses + ", job=" + job + ']');
                            }
                            else
                                logError("Error sending reply for job [nodeId=" + sndNode.id() + ", jobId=" +
                                    ses.getJobId() + ", ses=" + ses + ", job=" + job + ']', e);

                            if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
                                evts = addEvent(evts, EVT_JOB_FAILED, "Failed to send reply for job [nodeId=" +
                                    taskNode.id() + ", job=" + job + ']');
                        }
                        // Catching interrupted exception because
                        // it gets thrown for some reason.
                        catch (Exception e) {
                            String msg = "Failed to send reply for job [nodeId=" + taskNode.id() + ", job=" + job + ']';

                            logError(msg, e);

                            if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
                                evts = addEvent(evts, EVT_JOB_FAILED, msg);
                        }
                    }
                }
                else {
                    if (ex != null) {
                        status = FAILED;

                        if (isStarted) {
                            if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
                                evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" + ex +
                                    ", job=" + job + ']');
                        }
                        else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED))
                            evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started [ex=" + ex +
                                ", job=" + job + ']');
                    }
                    else {
                        status = FINISHED;

                        if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
                            evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
                    }
                }
            }
            else {
                // Job timed out.
                status = FAILED;

                if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
                    evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to timeout: " + job);
            }
        }
        finally {
            if (evts != null) {
                for (IgniteBiTuple<Integer, String> t : evts)
                    recordEvent(t.get1(), t.get2());
            }

            // Listener callback.
            evtLsnr.onJobFinished(this);
        }
    }