in modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java [329:652]
private void onQueryRequest0(
final ClusterNode node,
final long qryId,
final long reqId,
final int segmentId,
final String schemaName,
final Collection<GridCacheSqlQuery> qrys,
final List<Integer> cacheIds,
final AffinityTopologyVersion topVer,
final Map<UUID, int[]> partsMap,
final int[] parts,
final int pageSize,
final boolean distributedJoins,
final boolean enforceJoinOrder,
final boolean replicated,
final int timeout,
final Object[] params,
boolean lazy,
Boolean dataPageScanEnabled,
boolean treatReplicatedAsPartitioned
) {
boolean performanceStatsEnabled = ctx.performanceStatistics().enabled();
if (performanceStatsEnabled)
IoStatisticsQueryHelper.startGatheringQueryStatistics();
// Prepare to run queries.
GridCacheContext<?, ?> mainCctx = !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0))
: null;
MapNodeResults nodeRess = resultsForNode(node.id());
MapQueryResults qryResults = null;
PartitionReservation reserved = null;
QueryContext qctx = null;
// We don't use try with resources on purpose - the catch block must also be executed in the context of this span.
TraceSurroundings trace = MTC.support(ctx.tracing()
.create(SQL_QRY_EXEC_REQ, MTC.span())
.addTag(SQL_QRY_TEXT, () ->
qrys.stream().map(GridCacheSqlQuery::query).collect(Collectors.joining("; "))));
try {
if (topVer != null) {
// Reserve primary for topology version or explicit partitions.
reserved = h2.partitionReservationManager().reservePartitions(
cacheIds,
topVer,
parts,
node.id(),
reqId
);
if (reserved.failed()) {
sendRetry(node, reqId, segmentId, reserved.error());
return;
}
}
// Prepare query context.
DistributedJoinContext distributedJoinCtx = null;
if (distributedJoins && !replicated) {
distributedJoinCtx = new DistributedJoinContext(
topVer,
partsMap,
node.id(),
reqId,
segmentId,
pageSize
);
}
qctx = new QueryContext(
segmentId,
h2.backupFilter(topVer, parts, treatReplicatedAsPartitioned),
distributedJoinCtx,
reserved,
true);
qryResults = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, lazy, qctx);
// qctx is set, we have to release reservations inside of it.
reserved = null;
if (distributedJoinCtx != null)
qryCtxRegistry.setShared(node.id(), reqId, qctx);
if (nodeRess.put(reqId, segmentId, qryResults) != null)
throw new IllegalStateException();
if (nodeRess.cancelled(reqId)) {
qryCtxRegistry.clearShared(node.id(), reqId);
nodeRess.cancelRequest(reqId);
throw new QueryCancelledException();
}
// Run queries.
int qryIdx = 0;
boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
for (GridCacheSqlQuery qry : qrys) {
H2PooledConnection conn = h2.connections().connection(schemaName);
H2Utils.setupConnection(
conn,
qctx,
distributedJoins,
enforceJoinOrder,
lazy
);
MapQueryResult res = new MapQueryResult(h2, mainCctx, node.id(), qry, params, conn, log);
qryResults.addResult(qryIdx, res);
MapH2QueryInfo qryInfo = null;
try {
res.lock();
// Ensure we are on the target node for this replicated query.
if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
String sql = qry.query();
Collection<Object> params0 = F.asList(qry.parameters(params));
PreparedStatement stmt = conn.prepareStatement(sql, H2StatementCache.queryFlags(
distributedJoins,
enforceJoinOrder));
H2Utils.bindParameters(stmt, params0);
qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId);
h2.heavyQueriesTracker().startTracking(qryInfo);
if (performanceStatsEnabled) {
ctx.performanceStatistics().queryProperty(
GridCacheQueryType.SQL_FIELDS,
qryInfo.nodeId(),
qryInfo.queryId(),
"Map phase plan",
qryInfo.plan()
);
}
GridQueryCancel qryCancel = qryResults.queryCancel(qryIdx);
ResultSet rs = h2.executeWithResumableTimeTracking(
() -> h2.executeSqlQueryWithTimer(
stmt,
conn,
sql,
timeout,
qryCancel,
dataPageScanEnabled,
null
),
qryInfo
);
if (h2.runningQueryManager().planHistoryTracker().enabled()) {
MapH2QueryInfo qryInfo0 = qryInfo;
ctx.pools().getSystemExecutorService().submit(() ->
h2.runningQueryManager().planHistoryTracker().addPlan(
qryInfo0.plan(),
qryInfo0.sql(),
qryInfo0.schema(),
false,
IndexingQueryEngineConfiguration.ENGINE_NAME));
}
if (evt) {
ctx.event().record(new CacheQueryExecutedEvent<>(
node,
"SQL query executed.",
EVT_CACHE_QUERY_EXECUTED,
CacheQueryType.SQL.name(),
mainCctx.name(),
null,
qry.query(),
null,
null,
params,
node.id(),
null));
}
assert rs instanceof JdbcResultSet : rs.getClass();
if (qryResults.cancelled()) {
rs.close();
throw new QueryCancelledException();
}
res.openResult(rs, qryInfo);
MapQueryResults qryResults0 = qryResults;
int qryIdx0 = qryIdx;
final GridQueryNextPageResponse msg = h2.executeWithResumableTimeTracking(
() -> prepareNextPage(
nodeRess,
node,
qryResults0,
qryIdx0,
segmentId,
pageSize,
dataPageScanEnabled
),
qryInfo
);
if (msg != null)
sendNextPage(node, msg);
}
else {
assert !qry.isPartitioned();
qryResults.closeResult(qryIdx);
}
qryIdx++;
}
catch (Throwable e) {
if (qryInfo != null)
h2.heavyQueriesTracker().stopTracking(qryInfo, e);
throw e;
}
finally {
try {
res.unlockTables();
}
finally {
res.unlock();
}
}
} // for map queries
if (!lazy)
qryResults.releaseQueryContext();
}
catch (Throwable e) {
if (qryResults != null) {
nodeRess.remove(reqId, segmentId, qryResults);
qryResults.close();
// If a query is cancelled before execution is started partitions have to be released.
if (!lazy || !qryResults.isAllClosed())
qryResults.releaseQueryContext();
}
else
releaseReservations(qctx);
if (e instanceof QueryCancelledException)
sendError(node, reqId, e);
else {
SQLException sqlEx = X.cause(e, SQLException.class);
if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
sendQueryCancel(node, reqId);
else {
GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
if (retryErr != null) {
final String retryCause = String.format(
"Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
"errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
);
sendRetry(node, reqId, segmentId, retryCause);
}
else {
QueryRetryException qryRetryErr = X.cause(e, QueryRetryException.class);
if (qryRetryErr != null)
sendError(node, reqId, qryRetryErr);
else {
if (e instanceof Error) {
U.error(log, "Failed to execute local query.", e);
throw (Error)e;
}
U.warn(log, "Failed to execute local query.", e);
sendError(node, reqId, e);
}
}
}
}
}
finally {
if (reserved != null)
reserved.release();
if (trace != null)
trace.close();
if (performanceStatsEnabled) {
IoStatisticsHolder stat = IoStatisticsQueryHelper.finishGatheringQueryStatistics();
if (stat.logicalReads() > 0 || stat.physicalReads() > 0) {
ctx.performanceStatistics().queryReads(
GridCacheQueryType.SQL_FIELDS,
node.id(),
qryId,
stat.logicalReads(),
stat.physicalReads());
}
}
}
}