in modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java [859:1084]
void finishJob(
@Nullable Object res,
@Nullable IgniteException ex,
boolean sndReply,
boolean retry
) {
// Avoid finishing a job more than once from different threads.
if (!finishing.compareAndSet(false, true))
return;
// Do not send reply if job has been cancelled from system.
if (sndReply)
sndReply = !sysCancelled;
// We should save message ID here since listener callback will reset sequence.
ClusterNode sndNode = ctx.discovery().node(taskNode.id());
finishTime = U.currentTimeMillis();
Collection<IgniteBiTuple<Integer, String>> evts = null;
try {
if (ses.isFullSupport())
evtLsnr.onBeforeJobResponseSent(this);
// Send response back only if job has not timed out.
if (!isTimedOut()) {
if (sndReply) {
if (sndNode == null) {
onMasterNodeLeft();
U.warn(log, "Failed to reply to sender node because it left grid [nodeId=" + taskNode.id() +
", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + ']');
status = FAILED;
// Record job reply failure.
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Job reply failed (task node left grid): " + job);
}
else {
try {
byte[] resBytes = null;
byte[] exBytes = null;
byte[] attrBytes = null;
boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs();
Map<Object, Object> attrs = jobCtx.getAttributes();
// Try to serialize response, and if exception - return to client.
if (!loc) {
try {
resBytes = U.marshal(marsh, res);
}
catch (IgniteCheckedException e) {
resBytes = U.marshal(marsh, null);
if (ex != null)
ex.addSuppressed(e);
else
ex = U.convertException(e);
logError("Failed to serialize job response [nodeId=" + taskNode.id() +
", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
", resCls=" + (res == null ? null : res.getClass()) + ']', e);
}
try {
attrBytes = U.marshal(marsh, attrs);
}
catch (IgniteCheckedException e) {
attrBytes = U.marshal(marsh, Collections.emptyMap());
if (ex != null)
ex.addSuppressed(e);
else
ex = U.convertException(e);
logError("Failed to serialize job attributes [nodeId=" + taskNode.id() +
", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
", attrs=" + attrs + ']', e);
}
try {
exBytes = U.marshal(marsh, ex);
}
catch (IgniteCheckedException e) {
String msg = "Failed to serialize job exception [nodeId=" + taskNode.id() +
", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
", msg=\"" + e.getMessage() + "\"]";
ex = new IgniteException(msg);
logError(msg, e);
exBytes = U.marshal(marsh, ex);
}
}
if (ex != null) {
status = FAILED;
if (isStarted) {
// Job failed.
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" +
ex + ", job=" + job + ']');
}
else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED))
evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started " +
"[ex=" + ex + ", job=" + job + ']');
}
else {
status = FINISHED;
if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
}
GridJobExecuteResponse jobRes = new GridJobExecuteResponse(
ctx.localNodeId(),
ses.getId(),
ses.getJobId(),
exBytes,
loc ? ex : null,
resBytes,
loc ? res : null,
attrBytes,
loc ? attrs : null,
isCancelled(),
retry ? ctx.cache().context().exchange().readyAffinityVersion() : null);
long timeout = ses.getEndTime() - U.currentTimeMillis();
if (timeout <= 0)
// Ignore the actual timeout and send response anyway.
timeout = 1;
if (ses.isFullSupport()) {
// Send response to designated job topic.
// Always go through communication to preserve order,
// if attributes are enabled.
ctx.io().sendOrderedMessage(
sndNode,
taskTopic,
jobRes,
internal ? MANAGEMENT_POOL : SYSTEM_POOL,
timeout,
false);
}
else if (ctx.localNodeId().equals(sndNode.id()))
ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
else
// Send response to common topic as unordered message.
ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// Log and invoke the master-leave callback.
if ((e instanceof ClusterTopologyCheckedException) || isDeadNode(taskNode.id())) {
onMasterNodeLeft();
// Avoid stack trace for left nodes.
U.warn(log, "Failed to reply to sender node because it left grid " +
"[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() +
", ses=" + ses + ", job=" + job + ']');
}
else
logError("Error sending reply for job [nodeId=" + sndNode.id() + ", jobId=" +
ses.getJobId() + ", ses=" + ses + ", job=" + job + ']', e);
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Failed to send reply for job [nodeId=" +
taskNode.id() + ", job=" + job + ']');
}
// Catching interrupted exception because
// it gets thrown for some reason.
catch (Exception e) {
String msg = "Failed to send reply for job [nodeId=" + taskNode.id() + ", job=" + job + ']';
logError(msg, e);
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, msg);
}
}
}
else {
if (ex != null) {
status = FAILED;
if (isStarted) {
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" + ex +
", job=" + job + ']');
}
else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED))
evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started [ex=" + ex +
", job=" + job + ']');
}
else {
status = FINISHED;
if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
}
}
}
else {
// Job timed out.
status = FAILED;
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to timeout: " + job);
}
}
finally {
if (evts != null) {
for (IgniteBiTuple<Integer, String> t : evts)
recordEvent(t.get1(), t.get2());
}
// Listener callback.
evtLsnr.onJobFinished(this);
}
}