private ListFieldsQueryCursor mapAndExecutePlan()

in modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java [578:794]


    private ListFieldsQueryCursor<?> mapAndExecutePlan(
        RootQuery<Row> qry,
        MultiStepPlan plan
    ) {
        qry.mapping();

        Map<String, Object> qryParams = Commons.parametersMap(qry.parameters());

        MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion(), qry.context(), qryParams);

        ExecutionPlan execPlan = plan.init(mappingSvc, partSvc, mapCtx);

        List<Fragment> fragments = execPlan.fragments();

        if (ctx.security().enabled()) {
            for (Fragment fragment : fragments)
                checkPermissions(fragment.root());
        }

        // Local execution
        Fragment fragment = F.first(fragments);

        if (U.assertionsEnabled()) {
            assert fragment != null;

            FragmentMapping mapping = execPlan.mapping(fragment);

            assert mapping != null;

            List<UUID> nodes = mapping.nodeIds();

            assert nodes != null && (nodes.size() == 1 && F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
                    : "nodes=" + nodes + ", localNode=" + localNodeId();
        }

        long timeout = qry.remainingTime();

        if (timeout == 0) {
            throw new IgniteSQLException("The query was cancelled due to timeout", IgniteQueryErrorCode.QUERY_CANCELED,
                new QueryCancelledException());
        }

        FragmentDescription fragmentDesc = new FragmentDescription(
            fragment.fragmentId(),
            execPlan.mapping(fragment),
            execPlan.target(fragment),
            execPlan.remotes(fragment));

        MemoryTracker qryMemoryTracker = qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());

        final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(), ctx.cache().context());

        ExecutionContext<Row> ectx = new ExecutionContext<>(
            qry.context(),
            taskExecutor(),
            injectSvc,
            qry.id(),
            locNodeId,
            locNodeId,
            mapCtx.topologyVersion(),
            fragmentDesc,
            handler,
            qryMemoryTracker,
            createIoTracker(locNodeId, qry.localQueryId()),
            timeout,
            qryParams,
            userTx == null ? null : ExecutionContext.transactionChanges(userTx.writeEntries()));

        Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
            exchangeService(), failureProcessor()).go(fragment.root());

        qry.run(ectx, execPlan, plan.fieldsMetadata(), node);

        Map<UUID, Long> fragmentsPerNode = fragments.stream()
            .skip(1)
            .flatMap(f -> f.mapping().nodeIds().stream())
            .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

        // Start remote execution.
        for (int i = 1; i < fragments.size(); i++) {
            fragment = fragments.get(i);
            fragmentDesc = new FragmentDescription(
                fragment.fragmentId(),
                execPlan.mapping(fragment),
                execPlan.target(fragment),
                execPlan.remotes(fragment));

            Throwable ex = null;
            byte[] parametersMarshalled = null;

            for (UUID nodeId : fragmentDesc.nodeIds()) {
                if (ex != null)
                    qry.onResponse(nodeId, fragment.fragmentId(), ex);
                else {
                    try {
                        SessionContextImpl sesCtx = qry.context().unwrap(SessionContextImpl.class);

                        QueryStartRequest req = new QueryStartRequest(
                            qry.id(),
                            qry.localQueryId(),
                            qry.context().schemaName(),
                            fragment.serialized(),
                            ectx.topologyVersion(),
                            fragmentDesc,
                            fragmentsPerNode.get(nodeId).intValue(),
                            qry.parameters(),
                            parametersMarshalled,
                            timeout,
                            ectx.getQryTxEntries(),
                            sesCtx == null ? null : sesCtx.attributes()
                        );

                        messageService().send(nodeId, req);

                        // Avoid marshaling of the same parameters for other nodes.
                        if (parametersMarshalled == null)
                            parametersMarshalled = req.parametersMarshalled();
                    }
                    catch (Throwable e) {
                        qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
                    }
                }
            }
        }

        if (perfStatProc.enabled()) {
            perfStatProc.queryProperty(
                GridCacheQueryType.SQL_FIELDS,
                qry.initiatorNodeId(),
                qry.localQueryId(),
                "Query plan",
                plan.textPlan()
            );
        }

        if (ctx.query().runningQueryManager().planHistoryTracker().enabled()) {
            ctx.query().runningQueryManager().planHistoryTracker().addPlan(
                plan.textPlan(),
                qry.sql(),
                qry.context().schemaName(),
                qry.context().isLocal(),
                CalciteQueryEngineConfiguration.ENGINE_NAME
            );
        }

        QueryProperties qryProps = qry.context().unwrap(QueryProperties.class);

        Function<Object, Object> fieldConverter = (qryProps == null || qryProps.keepBinary()) ? null :
            o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false, true, null);

        HeavyQueriesTracker.ResultSetChecker resultSetChecker = ctx.query().runningQueryManager()
            .heavyQueriesTracker().resultSetChecker(qry);

        Function<List<Object>, List<Object>> rowConverter;

        // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return result to cursor.
        if (qryProps != null && qryProps.cacheName() != null && evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
            ClusterNode locNode = ctx.discovery().localNode();
            UUID subjId = SecurityUtils.securitySubjectId(ctx);

            rowConverter = row -> {
                evtMgr.record(new CacheQueryReadEvent<>(
                    locNode,
                    "SQL fields query result set row read.",
                    EVT_CACHE_QUERY_OBJECT_READ,
                    CacheQueryType.SQL_FIELDS.name(),
                    qryProps.cacheName(),
                    null,
                    qry.sql(),
                    null,
                    null,
                    qry.parameters(),
                    subjId,
                    null,
                    null,
                    null,
                    null,
                    row));

                resultSetChecker.checkOnFetchNext();

                return row;
            };
        }
        else {
            rowConverter = row -> {
                resultSetChecker.checkOnFetchNext();

                return row;
            };
        }

        Runnable onClose = () -> {
            if (perfStatProc.enabled()) {
                perfStatProc.queryRowsProcessed(
                    GridCacheQueryType.SQL_FIELDS,
                    qry.initiatorNodeId(),
                    qry.localQueryId(),
                    "Fetched",
                    resultSetChecker.fetchedSize()
                );
            }

            resultSetChecker.checkOnClose();
        };

        Iterator<List<?>> it = new ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
            fieldConverter, rowConverter, onClose);

        // Make yet another tracking layer for cursor.getAll(), so tracking hierarchy will look like:
        // Row tracker -> Cursor memory tracker -> Query memory tracker -> Global memory tracker.
        // It's required, since query memory tracker can be closed concurrently during getAll() and
        // tracked data for cursor can be lost without additional tracker.
        MemoryTracker curMemoryTracker = QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota());

        return new ListFieldsQueryCursor<>(plan, it, ectx, curMemoryTracker);
    }