in modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java [578:794]
private ListFieldsQueryCursor<?> mapAndExecutePlan(
RootQuery<Row> qry,
MultiStepPlan plan
) {
qry.mapping();
Map<String, Object> qryParams = Commons.parametersMap(qry.parameters());
MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion(), qry.context(), qryParams);
ExecutionPlan execPlan = plan.init(mappingSvc, partSvc, mapCtx);
List<Fragment> fragments = execPlan.fragments();
if (ctx.security().enabled()) {
for (Fragment fragment : fragments)
checkPermissions(fragment.root());
}
// Local execution
Fragment fragment = F.first(fragments);
if (U.assertionsEnabled()) {
assert fragment != null;
FragmentMapping mapping = execPlan.mapping(fragment);
assert mapping != null;
List<UUID> nodes = mapping.nodeIds();
assert nodes != null && (nodes.size() == 1 && F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
: "nodes=" + nodes + ", localNode=" + localNodeId();
}
long timeout = qry.remainingTime();
if (timeout == 0) {
throw new IgniteSQLException("The query was cancelled due to timeout", IgniteQueryErrorCode.QUERY_CANCELED,
new QueryCancelledException());
}
FragmentDescription fragmentDesc = new FragmentDescription(
fragment.fragmentId(),
execPlan.mapping(fragment),
execPlan.target(fragment),
execPlan.remotes(fragment));
MemoryTracker qryMemoryTracker = qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(), ctx.cache().context());
ExecutionContext<Row> ectx = new ExecutionContext<>(
qry.context(),
taskExecutor(),
injectSvc,
qry.id(),
locNodeId,
locNodeId,
mapCtx.topologyVersion(),
fragmentDesc,
handler,
qryMemoryTracker,
createIoTracker(locNodeId, qry.localQueryId()),
timeout,
qryParams,
userTx == null ? null : ExecutionContext.transactionChanges(userTx.writeEntries()));
Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
exchangeService(), failureProcessor()).go(fragment.root());
qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
Map<UUID, Long> fragmentsPerNode = fragments.stream()
.skip(1)
.flatMap(f -> f.mapping().nodeIds().stream())
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
// Start remote execution.
for (int i = 1; i < fragments.size(); i++) {
fragment = fragments.get(i);
fragmentDesc = new FragmentDescription(
fragment.fragmentId(),
execPlan.mapping(fragment),
execPlan.target(fragment),
execPlan.remotes(fragment));
Throwable ex = null;
byte[] parametersMarshalled = null;
for (UUID nodeId : fragmentDesc.nodeIds()) {
if (ex != null)
qry.onResponse(nodeId, fragment.fragmentId(), ex);
else {
try {
SessionContextImpl sesCtx = qry.context().unwrap(SessionContextImpl.class);
QueryStartRequest req = new QueryStartRequest(
qry.id(),
qry.localQueryId(),
qry.context().schemaName(),
fragment.serialized(),
ectx.topologyVersion(),
fragmentDesc,
fragmentsPerNode.get(nodeId).intValue(),
qry.parameters(),
parametersMarshalled,
timeout,
ectx.getQryTxEntries(),
sesCtx == null ? null : sesCtx.attributes()
);
messageService().send(nodeId, req);
// Avoid marshaling of the same parameters for other nodes.
if (parametersMarshalled == null)
parametersMarshalled = req.parametersMarshalled();
}
catch (Throwable e) {
qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
}
}
}
}
if (perfStatProc.enabled()) {
perfStatProc.queryProperty(
GridCacheQueryType.SQL_FIELDS,
qry.initiatorNodeId(),
qry.localQueryId(),
"Query plan",
plan.textPlan()
);
}
if (ctx.query().runningQueryManager().planHistoryTracker().enabled()) {
ctx.query().runningQueryManager().planHistoryTracker().addPlan(
plan.textPlan(),
qry.sql(),
qry.context().schemaName(),
qry.context().isLocal(),
CalciteQueryEngineConfiguration.ENGINE_NAME
);
}
QueryProperties qryProps = qry.context().unwrap(QueryProperties.class);
Function<Object, Object> fieldConverter = (qryProps == null || qryProps.keepBinary()) ? null :
o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false, true, null);
HeavyQueriesTracker.ResultSetChecker resultSetChecker = ctx.query().runningQueryManager()
.heavyQueriesTracker().resultSetChecker(qry);
Function<List<Object>, List<Object>> rowConverter;
// Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return result to cursor.
if (qryProps != null && qryProps.cacheName() != null && evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
ClusterNode locNode = ctx.discovery().localNode();
UUID subjId = SecurityUtils.securitySubjectId(ctx);
rowConverter = row -> {
evtMgr.record(new CacheQueryReadEvent<>(
locNode,
"SQL fields query result set row read.",
EVT_CACHE_QUERY_OBJECT_READ,
CacheQueryType.SQL_FIELDS.name(),
qryProps.cacheName(),
null,
qry.sql(),
null,
null,
qry.parameters(),
subjId,
null,
null,
null,
null,
row));
resultSetChecker.checkOnFetchNext();
return row;
};
}
else {
rowConverter = row -> {
resultSetChecker.checkOnFetchNext();
return row;
};
}
Runnable onClose = () -> {
if (perfStatProc.enabled()) {
perfStatProc.queryRowsProcessed(
GridCacheQueryType.SQL_FIELDS,
qry.initiatorNodeId(),
qry.localQueryId(),
"Fetched",
resultSetChecker.fetchedSize()
);
}
resultSetChecker.checkOnClose();
};
Iterator<List<?>> it = new ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
fieldConverter, rowConverter, onClose);
// Make yet another tracking layer for cursor.getAll(), so tracking hierarchy will look like:
// Row tracker -> Cursor memory tracker -> Query memory tracker -> Global memory tracker.
// It's required, since query memory tracker can be closed concurrently during getAll() and
// tracked data for cursor can be lost without additional tracker.
MemoryTracker curMemoryTracker = QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota());
return new ListFieldsQueryCursor<>(plan, it, ectx, curMemoryTracker);
}