in modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java [1214:1463]
public void processJobExecuteRequest(ClusterNode node, final GridJobExecuteRequest req) {
if (log.isDebugEnabled())
log.debug("Received job request message [req=" + req + ", nodeId=" + node.id() + ']');
PartitionsReservation partsReservation = null;
if (req.getCacheIds() != null) {
assert req.getPartition() >= 0 : req;
assert !F.isEmpty(req.getCacheIds()) : req;
partsReservation = new PartitionsReservation(req.getCacheIds(), req.getPartition(), req.getTopVer());
}
GridJobWorker job = null;
if (!rwLock.tryReadLock()) {
if (log.isDebugEnabled())
log.debug("Received job execution request while stopping this node (will ignore): " + req);
return;
}
try {
long endTime = req.getCreateTime() + req.getTimeout();
// Account for overflow.
if (endTime < 0)
endTime = Long.MAX_VALUE;
GridDeployment tmpDep = req.isForceLocalDeployment() ?
ctx.deploy().getLocalDeployment(req.getTaskClassName()) :
ctx.deploy().getGlobalDeployment(
req.getDeploymentMode(),
req.getTaskName(),
req.getTaskClassName(),
req.getUserVersion(),
node.id(),
req.getClassLoaderId(),
req.getLoaderParticipants(),
null);
if (tmpDep == null) {
if (log.isDebugEnabled())
log.debug("Checking local tasks...");
// Check local tasks.
for (Map.Entry<String, GridDeployment> d : ctx.task().getUsedDeploymentMap().entrySet()) {
if (d.getValue().classLoaderId().equals(req.getClassLoaderId())) {
assert d.getValue().local();
tmpDep = d.getValue();
break;
}
}
}
final GridDeployment dep = tmpDep;
if (log.isDebugEnabled())
log.debug("Deployment: " + dep);
boolean releaseDep = true;
try {
if (dep != null && dep.acquire()) {
GridJobSessionImpl jobSes;
GridJobContextImpl jobCtx;
try {
List<ComputeJobSibling> siblings = null;
if (!req.isDynamicSiblings()) {
Collection<ComputeJobSibling> siblings0 = req.getSiblings();
if (siblings0 == null) {
assert req.getSiblingsBytes() != null;
siblings0 = U.unmarshal(marsh, req.getSiblingsBytes(), U.resolveClassLoader(ctx.config()));
}
siblings = new ArrayList<>(siblings0);
}
Map<Object, Object> sesAttrs = null;
if (req.isSessionFullSupport()) {
sesAttrs = req.getSessionAttributes();
if (sesAttrs == null)
sesAttrs = U.unmarshal(marsh, req.getSessionAttributesBytes(),
U.resolveClassLoader(dep.classLoader(), ctx.config()));
}
IgnitePredicate<ClusterNode> topPred = req.getTopologyPredicate();
if (topPred == null && req.getTopologyPredicateBytes() != null) {
topPred = U.unmarshal(marsh, req.getTopologyPredicateBytes(),
U.resolveClassLoader(dep.classLoader(), ctx.config()));
}
// Note that we unmarshal session/job attributes here with proper class loader.
GridTaskSessionImpl taskSes = ctx.session().createTaskSession(
req.getSessionId(),
node.id(),
req.getTaskName(),
dep,
req.getTaskClassName(),
req.topology(),
topPred,
req.getStartTaskTime(),
endTime,
siblings,
sesAttrs,
req.isSessionFullSupport(),
req.isInternal(),
req.executorName(),
ctx.security().securityContext()
);
taskSes.setCheckpointSpi(req.getCheckpointSpi());
taskSes.setClassLoader(dep.classLoader());
jobSes = new GridJobSessionImpl(ctx, taskSes, req.getJobId());
Map<? extends Serializable, ? extends Serializable> jobAttrs = req.getJobAttributes();
if (jobAttrs == null)
jobAttrs = U.unmarshal(marsh, req.getJobAttributesBytes(),
U.resolveClassLoader(dep.classLoader(), ctx.config()));
jobCtx = new GridJobContextImpl(ctx, req.getJobId(), jobAttrs);
}
catch (IgniteCheckedException e) {
IgniteException ex = new IgniteException("Failed to deserialize task attributes " +
"[taskName=" + req.getTaskName() + ", taskClsName=" + req.getTaskClassName() +
", codeVer=" + req.getUserVersion() + ", taskClsLdr=" + dep.classLoader() + ']', e);
U.error(log, ex.getMessage(), e);
handleException(node, req, ex, endTime);
return;
}
job = new GridJobWorker(
ctx,
dep,
req.getCreateTime(),
jobSes,
jobCtx,
req.getJobBytes(),
req.getJob(),
node,
req.isInternal(),
evtLsnr,
holdLsnr,
partsReservation,
req.getTopVer(),
req.executorName(),
this::computeJobWorkerInterruptTimeout
);
jobCtx.job(job);
// If exception occurs on job initialization, deployment is released in job listener.
releaseDep = false;
if (job.initialize(dep, dep.deployedClass(req.getTaskClassName()).get1())) {
// Internal jobs will always be executed synchronously.
if (job.isInternal()) {
// This is an internal job and can be executed inside busy lock
// since job is expected to be short.
// This is essential for proper stop without races.
runSync(job);
// No execution outside lock.
job = null;
}
else if (jobAlwaysActivate) {
if (onBeforeActivateJob(job)) {
if (ctx.localNodeId().equals(node.id())) {
// Always execute in another thread for local node.
executeAsync(job);
// No sync execution.
job = null;
}
else {
if (metricsUpdateFreq > -1L)
// Job will be executed synchronously.
startedJobsCnt.increment();
startedJobsMetric.increment();
}
}
else
// Job has been cancelled.
// Set to null, to avoid sync execution.
job = null;
}
else {
GridJobWorker old = passiveJobs.putIfAbsent(job.getJobId(), job);
if (old == null) {
waitingJobsMetric.increment();
handleCollisions();
}
else
U.error(log, "Received computation request with duplicate job ID (could be " +
"network malfunction, source node may hang if task timeout was not set) " +
"[srcNode=" + node.id() +
", jobId=" + req.getJobId() + ", sesId=" + req.getSessionId() +
", locNodeId=" + ctx.localNodeId() + ']');
// No sync execution.
job = null;
}
}
else
// Job was not initialized, no execution.
job = null;
}
else {
// Deployment is null.
IgniteException ex = new IgniteDeploymentException("Task was not deployed or was redeployed since " +
"task execution [taskName=" + req.getTaskName() + ", taskClsName=" + req.getTaskClassName() +
", codeVer=" + req.getUserVersion() + ", clsLdrId=" + req.getClassLoaderId() +
", seqNum=" + req.getClassLoaderId().localId() + ", depMode=" + req.getDeploymentMode() +
", dep=" + dep + ']');
U.error(log, ex.getMessage(), ex);
handleException(node, req, ex, endTime);
}
}
finally {
if (dep != null && releaseDep)
release(dep);
}
}
finally {
rwLock.readUnlock();
}
if (job != null)
job.run();
}