private void onQueryRequest0()

in modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java [329:652]


    private void onQueryRequest0(
        final ClusterNode node,
        final long qryId,
        final long reqId,
        final int segmentId,
        final String schemaName,
        final Collection<GridCacheSqlQuery> qrys,
        final List<Integer> cacheIds,
        final AffinityTopologyVersion topVer,
        final Map<UUID, int[]> partsMap,
        final int[] parts,
        final int pageSize,
        final boolean distributedJoins,
        final boolean enforceJoinOrder,
        final boolean replicated,
        final int timeout,
        final Object[] params,
        boolean lazy,
        Boolean dataPageScanEnabled,
        boolean treatReplicatedAsPartitioned
    ) {
        boolean performanceStatsEnabled = ctx.performanceStatistics().enabled();

        if (performanceStatsEnabled)
            IoStatisticsQueryHelper.startGatheringQueryStatistics();

        // Prepare to run queries.
        GridCacheContext<?, ?> mainCctx = !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0))
            : null;

        MapNodeResults nodeRess = resultsForNode(node.id());

        MapQueryResults qryResults = null;

        PartitionReservation reserved = null;

        QueryContext qctx = null;

        // We don't use try with resources on purpose - the catch block must also be executed in the context of this span.
        TraceSurroundings trace = MTC.support(ctx.tracing()
            .create(SQL_QRY_EXEC_REQ, MTC.span())
            .addTag(SQL_QRY_TEXT, () ->
                qrys.stream().map(GridCacheSqlQuery::query).collect(Collectors.joining("; "))));

        try {
            if (topVer != null) {
                // Reserve primary for topology version or explicit partitions.
                reserved = h2.partitionReservationManager().reservePartitions(
                    cacheIds,
                    topVer,
                    parts,
                    node.id(),
                    reqId
                );

                if (reserved.failed()) {
                    sendRetry(node, reqId, segmentId, reserved.error());

                    return;
                }
            }

            // Prepare query context.
            DistributedJoinContext distributedJoinCtx = null;

            if (distributedJoins && !replicated) {
                distributedJoinCtx = new DistributedJoinContext(
                    topVer,
                    partsMap,
                    node.id(),
                    reqId,
                    segmentId,
                    pageSize
                );
            }

            qctx = new QueryContext(
                segmentId,
                h2.backupFilter(topVer, parts, treatReplicatedAsPartitioned),
                distributedJoinCtx,
                reserved,
                true);

            qryResults = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, lazy, qctx);

            // qctx is set, we have to release reservations inside of it.
            reserved = null;

            if (distributedJoinCtx != null)
                qryCtxRegistry.setShared(node.id(), reqId, qctx);

            if (nodeRess.put(reqId, segmentId, qryResults) != null)
                throw new IllegalStateException();

            if (nodeRess.cancelled(reqId)) {
                qryCtxRegistry.clearShared(node.id(), reqId);

                nodeRess.cancelRequest(reqId);

                throw new QueryCancelledException();
            }

            // Run queries.
            int qryIdx = 0;

            boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);

            for (GridCacheSqlQuery qry : qrys) {
                H2PooledConnection conn = h2.connections().connection(schemaName);

                H2Utils.setupConnection(
                    conn,
                    qctx,
                    distributedJoins,
                    enforceJoinOrder,
                    lazy
                );

                MapQueryResult res = new MapQueryResult(h2, mainCctx, node.id(), qry, params, conn, log);

                qryResults.addResult(qryIdx, res);

                MapH2QueryInfo qryInfo = null;

                try {
                    res.lock();

                    // Ensure we are on the target node for this replicated query.
                    if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
                        String sql = qry.query();
                        Collection<Object> params0 = F.asList(qry.parameters(params));

                        PreparedStatement stmt = conn.prepareStatement(sql, H2StatementCache.queryFlags(
                            distributedJoins,
                            enforceJoinOrder));

                        H2Utils.bindParameters(stmt, params0);

                        qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId);

                        h2.heavyQueriesTracker().startTracking(qryInfo);

                        if (performanceStatsEnabled) {
                            ctx.performanceStatistics().queryProperty(
                                GridCacheQueryType.SQL_FIELDS,
                                qryInfo.nodeId(),
                                qryInfo.queryId(),
                                "Map phase plan",
                                qryInfo.plan()
                            );
                        }

                        GridQueryCancel qryCancel = qryResults.queryCancel(qryIdx);

                        ResultSet rs = h2.executeWithResumableTimeTracking(
                            () -> h2.executeSqlQueryWithTimer(
                                stmt,
                                conn,
                                sql,
                                timeout,
                                qryCancel,
                                dataPageScanEnabled,
                                null
                            ),
                            qryInfo
                        );

                        if (h2.runningQueryManager().planHistoryTracker().enabled()) {
                            MapH2QueryInfo qryInfo0 = qryInfo;

                            ctx.pools().getSystemExecutorService().submit(() ->
                                h2.runningQueryManager().planHistoryTracker().addPlan(
                                    qryInfo0.plan(),
                                    qryInfo0.sql(),
                                    qryInfo0.schema(),
                                    false,
                                    IndexingQueryEngineConfiguration.ENGINE_NAME));
                        }

                        if (evt) {
                            ctx.event().record(new CacheQueryExecutedEvent<>(
                                node,
                                "SQL query executed.",
                                EVT_CACHE_QUERY_EXECUTED,
                                CacheQueryType.SQL.name(),
                                mainCctx.name(),
                                null,
                                qry.query(),
                                null,
                                null,
                                params,
                                node.id(),
                                null));
                        }

                        assert rs instanceof JdbcResultSet : rs.getClass();

                        if (qryResults.cancelled()) {
                            rs.close();

                            throw new QueryCancelledException();
                        }

                        res.openResult(rs, qryInfo);

                        MapQueryResults qryResults0 = qryResults;

                        int qryIdx0 = qryIdx;

                        final GridQueryNextPageResponse msg = h2.executeWithResumableTimeTracking(
                            () -> prepareNextPage(
                                nodeRess,
                                node,
                                qryResults0,
                                qryIdx0,
                                segmentId,
                                pageSize,
                                dataPageScanEnabled
                            ),
                            qryInfo
                        );

                        if (msg != null)
                            sendNextPage(node, msg);
                    }
                    else {
                        assert !qry.isPartitioned();

                        qryResults.closeResult(qryIdx);
                    }

                    qryIdx++;
                }
                catch (Throwable e) {
                    if (qryInfo != null)
                        h2.heavyQueriesTracker().stopTracking(qryInfo, e);

                    throw e;
                }
                finally {
                    try {
                        res.unlockTables();
                    }
                    finally {
                        res.unlock();
                    }
                }
            } // for map queries

            if (!lazy)
                qryResults.releaseQueryContext();
        }
        catch (Throwable e) {
            if (qryResults != null) {
                nodeRess.remove(reqId, segmentId, qryResults);

                qryResults.close();

                // If a query is cancelled before execution is started partitions have to be released.
                if (!lazy || !qryResults.isAllClosed())
                    qryResults.releaseQueryContext();
            }
            else
                releaseReservations(qctx);

            if (e instanceof QueryCancelledException)
                sendError(node, reqId, e);
            else {
                SQLException sqlEx = X.cause(e, SQLException.class);

                if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
                    sendQueryCancel(node, reqId);
                else {
                    GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);

                    if (retryErr != null) {
                        final String retryCause = String.format(
                            "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
                                "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
                        );

                        sendRetry(node, reqId, segmentId, retryCause);
                    }
                    else {
                        QueryRetryException qryRetryErr = X.cause(e, QueryRetryException.class);

                        if (qryRetryErr != null)
                            sendError(node, reqId, qryRetryErr);
                        else {
                            if (e instanceof Error) {
                                U.error(log, "Failed to execute local query.", e);

                                throw (Error)e;
                            }

                            U.warn(log, "Failed to execute local query.", e);

                            sendError(node, reqId, e);
                        }
                    }
                }
            }
        }
        finally {
            if (reserved != null)
                reserved.release();

            if (trace != null)
                trace.close();

            if (performanceStatsEnabled) {
                IoStatisticsHolder stat = IoStatisticsQueryHelper.finishGatheringQueryStatistics();

                if (stat.logicalReads() > 0 || stat.physicalReads() > 0) {
                    ctx.performanceStatistics().queryReads(
                        GridCacheQueryType.SQL_FIELDS,
                        node.id(),
                        qryId,
                        stat.logicalReads(),
                        stat.physicalReads());
                }
            }
        }
    }