in modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java [527:774]
private <T, R> ComputeTaskInternalFuture<R> startTask(
@Nullable String taskName,
@Nullable Class<?> taskCls,
@Nullable ComputeTask<T, R> task,
IgniteUuid sesId,
@Nullable T arg,
TaskExecutionOptions opts
) {
assert sesId != null;
authorizeUserTask(taskName, taskCls, task, opts);
assert opts.timeout() >= 0;
long timeout0 = opts.timeout() == 0 ? Long.MAX_VALUE : opts.timeout();
long startTime = U.currentTimeMillis();
long endTime = timeout0 + startTime;
// Account for overflow.
if (endTime < 0)
endTime = Long.MAX_VALUE;
IgniteCheckedException deployEx = null;
GridDeployment dep = null;
// User provided task name.
if (taskName != null) {
assert taskCls == null;
assert task == null;
try {
dep = ctx.deploy().getDeployment(taskName);
if (dep == null)
throw new IgniteDeploymentCheckedException("Unknown task name or failed to auto-deploy " +
"task (was task (re|un)deployed?): " + taskName);
IgniteBiTuple<Class<?>, Throwable> cls = dep.deployedClass(taskName);
if (cls.get1() == null)
throw new IgniteDeploymentCheckedException("Unknown task name or failed to auto-deploy " +
"task (was task (re|un)deployed?) [taskName=" + taskName + ", dep=" + dep + ']', cls.get2());
taskCls = cls.get1();
if (!ComputeTask.class.isAssignableFrom(taskCls))
throw new IgniteCheckedException("Failed to auto-deploy task (deployed class is not a task) " +
"[taskName=" +
taskName + ", depCls=" + taskCls + ']');
}
catch (IgniteCheckedException e) {
deployEx = e;
}
}
// Deploy user task class.
else if (taskCls != null) {
assert task == null;
try {
// Implicit deploy.
dep = ctx.deploy().deploy(taskCls, U.detectClassLoader(taskCls));
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to auto-deploy task " +
"(was task (re|un)deployed?): " + taskCls);
taskName = taskName(dep, taskCls, opts);
}
catch (IgniteCheckedException e) {
taskName = taskCls.getName();
deployEx = e;
}
}
// Deploy user task.
else if (task != null) {
try {
ClassLoader ldr;
Class<?> cls;
if (task instanceof GridPeerDeployAware) {
GridPeerDeployAware depAware = (GridPeerDeployAware)task;
cls = depAware.deployClass();
ldr = depAware.classLoader();
// Set proper class name to make peer-loading possible.
taskCls = cls;
}
else {
taskCls = task.getClass();
assert ComputeTask.class.isAssignableFrom(taskCls);
cls = task.getClass();
ldr = U.detectClassLoader(cls);
}
// Explicit deploy.
dep = ctx.deploy().deploy(cls, ldr);
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to auto-deploy task " +
"(was task (re|un)deployed?): " + cls);
taskName = taskName(dep, taskCls, opts);
}
catch (IgniteCheckedException e) {
taskName = task.getClass().getName();
deployEx = e;
}
}
assert taskName != null;
if (log.isDebugEnabled())
log.debug("Task deployment: " + dep);
boolean fullSup = (dep != null && taskCls != null &&
dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) != null) ||
(task instanceof PlatformFullTask && ((PlatformFullTask)task).taskSessionFullSupport());
Collection<UUID> top = null;
final IgnitePredicate<ClusterNode> topPred = opts.projectionPredicate();
if (topPred == null) {
final Collection<ClusterNode> nodes = opts.projection();
top = nodes != null ? nodeIds(nodes) : null;
}
boolean internal = false;
if (dep == null || taskCls == null)
assert deployEx != null;
else
internal = dep.internalTask(task, taskCls);
// Creates task session with task name and task version.
GridTaskSessionImpl ses = ctx.session().createTaskSession(
sesId,
ctx.localNodeId(),
taskName,
dep,
taskCls == null ? null : taskCls.getName(),
top,
topPred,
startTime,
endTime,
Collections.emptyList(),
emptyMap(),
fullSup,
internal,
opts.executor(),
ctx.security().securityContext()
);
ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx);
IgniteCheckedException securityEx = null;
if (ctx.security().enabled() && deployEx == null && !dep.internalTask(task, taskCls)) {
try {
saveTaskMetadata(taskName);
}
catch (IgniteCheckedException e) {
securityEx = e;
}
}
if (deployEx == null && securityEx == null) {
if (dep == null || !dep.acquire())
handleException(new IgniteDeploymentCheckedException("Task not deployed: " + ses.getTaskName()), fut);
else {
GridTaskWorker<?, ?> taskWorker = new GridTaskWorker<>(
ctx,
arg,
ses,
fut,
taskCls,
task,
dep,
new TaskEventListener(),
opts,
securitySubjectId(ctx));
GridTaskWorker<?, ?> taskWorker0 = tasks.putIfAbsent(sesId, taskWorker);
assert taskWorker0 == null : "Session ID is not unique: " + sesId;
if (ctx.event().isRecordable(EVT_MANAGEMENT_TASK_STARTED) && dep.visorManagementTask(task, taskCls)) {
VisorTaskArgument visorTaskArg = (VisorTaskArgument)arg;
Event evt = new ManagementTaskEvent(
ctx.discovery().localNode(),
visorTaskArg != null && visorTaskArg.getArgument() != null
? visorTaskArg.getArgument().toString() : "[]",
EVT_MANAGEMENT_TASK_STARTED,
ses.getId(),
taskName,
taskCls == null ? null : taskCls.getName(),
false,
securitySubjectId(ctx),
visorTaskArg
);
ctx.event().record(evt);
}
if (!ctx.clientDisconnected()) {
if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
try {
// Start task execution in another thread.
if (opts.isSystemTask())
ctx.pools().getSystemExecutorService().execute(taskWorker);
else
ctx.pools().getExecutorService().execute(taskWorker);
}
catch (RejectedExecutionException e) {
tasks.remove(sesId);
release(dep);
handleException(new ComputeExecutionRejectedException("Failed to execute task " +
"due to thread pool execution rejection: " + taskName, e), fut);
}
}
else
taskWorker.run();
}
else
taskWorker.finishTask(null, disconnectedError(null));
}
}
else {
if (deployEx != null)
handleException(deployEx, fut);
else
handleException(securityEx, fut);
}
return fut;
}