private IgniteInternalFuture handleAsyncUnsafe()

in modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java [181:402]


    private IgniteInternalFuture<GridRestResponse> handleAsyncUnsafe(final GridRestRequest req) throws IgniteCheckedException {
        assert req instanceof GridRestTaskRequest : "Invalid command for topology handler: " + req;

        assert SUPPORTED_COMMANDS.contains(req.command());

        if (log.isDebugEnabled())
            log.debug("Handling task REST request: " + req);

        GridRestTaskRequest req0 = (GridRestTaskRequest)req;

        final GridFutureAdapter<GridRestResponse> fut = new GridFutureAdapter<>();

        final GridRestResponse res = new GridRestResponse();

        final GridClientTaskResultBean taskRestRes = new GridClientTaskResultBean();

        // Set ID placeholder for the case it wouldn't be available due to remote execution.
        taskRestRes.setId('~' + ctx.localNodeId().toString());

        final boolean locExec = req0.destinationId() == null || req0.destinationId().equals(ctx.localNodeId()) ||
            ctx.discovery().node(req0.destinationId()) == null;

        switch (req.command()) {
            case EXE: {
                final boolean async = req0.async();

                final String name = req0.taskName();

                if (F.isEmpty(name))
                    throw new IgniteCheckedException(missingParameter("name"));

                final List<Object> params = req0.params();

                long timeout = req0.timeout();

                final IgniteInternalFuture<Object> taskFut;

                if (locExec) {
                    Object arg = !F.isEmpty(params) ? params.size() == 1 ? params.get(0) : params.toArray() : null;

                    taskFut = ctx.task().execute(name, arg, options().asPublicRequest().withTimeout(timeout));
                }
                else {
                    // Using predicate instead of node intentionally
                    // in order to provide user well-structured EmptyProjectionException.
                    ClusterGroup prj = ctx.grid().cluster().forPredicate(nodeForNodeId(req.destinationId()));

                    taskFut = ctx.closure().callAsync(
                        BALANCE,
                        new ExeCallable(name, params, timeout),
                        options(prj.nodes()).withFailoverDisabled()
                    );
                }

                if (async) {
                    if (locExec) {
                        IgniteUuid tid = ((ComputeTaskInternalFuture)taskFut).getTaskSession().getId();

                        taskDescs.put(tid, new TaskDescriptor(false, null, null));

                        taskRestRes.setId(tid.toString() + '~' + ctx.localNodeId().toString());

                        res.setResponse(taskRestRes);
                    }
                    else
                        res.setError("Asynchronous task execution is not supported for routing request.");

                    fut.onDone(res);
                }

                taskFut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() {
                    @Override public void apply(IgniteInternalFuture<Object> taskFut) {
                        try {
                            TaskDescriptor desc;

                            try {
                                desc = new TaskDescriptor(true, taskFut.get(), null);
                            }
                            catch (IgniteCheckedException e) {
                                if (e.hasCause(ClusterTopologyCheckedException.class, ClusterGroupEmptyCheckedException.class))
                                    U.warn(log, "Failed to execute task due to topology issues (are all mapped " +
                                        "nodes alive?) [name=" + name + ", clientId=" + req.clientId() +
                                        ", err=" + e + ']');
                                else {
                                    if (!X.hasCause(e, VisorClusterGroupEmptyException.class))
                                        U.error(log, "Failed to execute task [name=" + name + ", clientId=" +
                                            req.clientId() + ']', e);
                                }

                                desc = new TaskDescriptor(true, null, e);
                            }

                            if (async && locExec) {
                                assert taskFut instanceof ComputeTaskInternalFuture;

                                IgniteUuid tid = ((ComputeTaskInternalFuture)taskFut).getTaskSession().getId();

                                taskDescs.put(tid, desc);
                            }

                            if (!async) {
                                if (desc.error() == null) {
                                    try {
                                        taskRestRes.setFinished(true);
                                        taskRestRes.setResult(desc.result());

                                        res.setResponse(taskRestRes);
                                        fut.onDone(res);
                                    }
                                    catch (IgniteException e) {
                                        fut.onDone(new IgniteCheckedException("Failed to marshal task result: " +
                                            desc.result(), e));
                                    }
                                }
                                else
                                    fut.onDone(desc.error());
                            }
                        }
                        finally {
                            if (!async && !fut.isDone())
                                fut.onDone(new IgniteCheckedException("Failed to execute task (see server logs for details)."));
                        }
                    }
                });

                break;
            }

            case RESULT: {
                String id = req0.taskId();

                if (F.isEmpty(id))
                    throw new IgniteCheckedException(missingParameter("id"));

                StringTokenizer st = new StringTokenizer(id, "~");

                if (st.countTokens() != 2)
                    throw new IgniteCheckedException("Failed to parse id parameter: " + id);

                String tidParam = st.nextToken();
                String resHolderIdParam = st.nextToken();

                taskRestRes.setId(id);

                try {
                    IgniteUuid tid = !F.isEmpty(tidParam) ? IgniteUuid.fromString(tidParam) : null;

                    UUID resHolderId = !F.isEmpty(resHolderIdParam) ? UUID.fromString(resHolderIdParam) : null;

                    if (tid == null || resHolderId == null)
                        throw new IgniteCheckedException("Failed to parse id parameter: " + id);

                    if (ctx.localNodeId().equals(resHolderId)) {
                        TaskDescriptor desc = taskDescs.get(tid);

                        if (desc == null)
                            throw new IgniteCheckedException("Task with provided id has never been started on provided node" +
                                " [taskId=" + tidParam + ", taskResHolderId=" + resHolderIdParam + ']');

                        taskRestRes.setFinished(desc.finished());

                        if (desc.error() != null)
                            throw new IgniteCheckedException(desc.error().getMessage());

                        taskRestRes.setResult(desc.result());

                        res.setResponse(taskRestRes);
                    }
                    else {
                        IgniteBiTuple<String, GridTaskResultResponse> t = requestTaskResult(resHolderId, tid);

                        if (t.get1() != null)
                            throw new IgniteCheckedException(t.get1());

                        GridTaskResultResponse taskRes = t.get2();

                        assert taskRes != null;

                        if (!taskRes.found())
                            throw new IgniteCheckedException("Task with provided id has never been started on provided node " +
                                "[taskId=" + tidParam + ", taskResHolderId=" + resHolderIdParam + ']');

                        taskRestRes.setFinished(taskRes.finished());

                        if (taskRes.error() != null)
                            throw new IgniteCheckedException(taskRes.error());

                        taskRestRes.setResult(taskRes.result());

                        res.setResponse(taskRestRes);
                    }
                }
                catch (IllegalArgumentException e) {
                    String msg = "Failed to parse parameters [taskId=" + tidParam + ", taskResHolderId="
                        + resHolderIdParam + ", err=" + e.getMessage() + ']';

                    if (log.isDebugEnabled())
                        log.debug(msg);

                    throw new IgniteCheckedException(msg, e);
                }

                fut.onDone(res);

                break;
            }

            case NOOP: {
                fut.onDone(new GridRestResponse());

                break;
            }

            default:
                assert false : "Invalid command for task handler: " + req;
        }

        if (log.isDebugEnabled())
            log.debug("Handled task REST request [res=" + res + ", req=" + req + ']');

        return fut;
    }