in server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java [360:551]
private <T> DataSource inlineIfNecessary(
final DataSource dataSource,
@Nullable final QueryToolChest toolChestIfOutermost,
final AtomicInteger subqueryRowLimitAccumulator,
final AtomicLong subqueryMemoryLimitAccumulator,
final AtomicBoolean cannotMaterializeToFrames,
final int maxSubqueryRows,
final long maxSubqueryMemory,
final boolean useNestedForUnknownTypeInSubquery,
final boolean dryRun
)
{
if (dataSource instanceof QueryDataSource) {
// This datasource is a subquery.
final Query subQuery = ((QueryDataSource) dataSource).getQuery();
final QueryToolChest toolChest = conglomerate.getToolChest(subQuery);
final QueryLogic subQueryLogic = conglomerate.getQueryLogic(subQuery);
if (subQueryLogic != null) {
final Sequence<?> queryResults;
if (dryRun) {
queryResults = Sequences.empty();
} else {
Query subQueryWithSerialization = subQuery.withOverriddenContext(
Collections.singletonMap(
ResultSerializationMode.CTX_SERIALIZATION_PARAMETER,
ClientQuerySegmentWalkerUtils.getLimitType(maxSubqueryMemory, cannotMaterializeToFrames.get())
.serializationMode()
.toString()
)
);
queryResults = subQueryLogic
.entryPoint(subQueryWithSerialization, this)
.run(QueryPlus.wrap(subQueryWithSerialization), DirectDruidClient.makeResponseContextForQuery());
}
return toInlineDataSource(
subQuery,
queryResults,
(QueryToolChest) new QueryLogicCompatToolChest(subQuery.getResultRowSignature()),
subqueryRowLimitAccumulator,
subqueryMemoryLimitAccumulator,
cannotMaterializeToFrames,
maxSubqueryRows,
maxSubqueryMemory,
useNestedForUnknownTypeInSubquery,
subqueryStatsProvider,
!dryRun,
emitter
);
}
if (toolChestIfOutermost != null && toolChestIfOutermost.canPerformSubquery(subQuery)) {
// Strip outer queries that are handleable by the toolchest, and inline subqueries that may be underneath
// them (e.g. subqueries nested under a join).
final Stack<DataSource> stack = new Stack<>();
DataSource current = dataSource;
while (current instanceof QueryDataSource) {
stack.push(current);
current = Iterables.getOnlyElement(current.getChildren());
}
if (current instanceof QueryDataSource) {
throw new ISE("Got a QueryDataSource[%s], should've walked it away in the loop above.", current);
}
current = inlineIfNecessary(
current,
null,
subqueryRowLimitAccumulator,
subqueryMemoryLimitAccumulator,
cannotMaterializeToFrames,
maxSubqueryRows,
maxSubqueryMemory,
useNestedForUnknownTypeInSubquery,
dryRun
);
while (!stack.isEmpty()) {
current = stack.pop().withChildren(Collections.singletonList(current));
}
if (!(current instanceof QueryDataSource)) {
throw new ISE("Should have a QueryDataSource, but got[%s] instead", current);
}
if (toolChest.canPerformSubquery(((QueryDataSource) current).getQuery())) {
return current;
} else {
// Something happened during inlining that means the toolchest is no longer able to handle this subquery.
// We need to consider inlining it.
return inlineIfNecessary(
current,
toolChestIfOutermost,
subqueryRowLimitAccumulator,
subqueryMemoryLimitAccumulator,
cannotMaterializeToFrames,
maxSubqueryRows,
maxSubqueryMemory,
useNestedForUnknownTypeInSubquery,
dryRun
);
}
} else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) {
// Subquery needs to be inlined. Assign it a subquery id and run it.
final Sequence<?> queryResults;
if (dryRun) {
queryResults = Sequences.empty();
} else {
Query subQueryWithSerialization = subQuery.withOverriddenContext(
Collections.singletonMap(
ResultSerializationMode.CTX_SERIALIZATION_PARAMETER,
ClientQuerySegmentWalkerUtils.getLimitType(maxSubqueryMemory, cannotMaterializeToFrames.get())
.serializationMode()
.toString()
)
);
queryResults = subQueryWithSerialization
.getRunner(this)
.run(QueryPlus.wrap(subQueryWithSerialization), DirectDruidClient.makeResponseContextForQuery());
}
return toInlineDataSource(
subQuery,
queryResults,
toolChest,
subqueryRowLimitAccumulator,
subqueryMemoryLimitAccumulator,
cannotMaterializeToFrames,
maxSubqueryRows,
maxSubqueryMemory,
useNestedForUnknownTypeInSubquery,
subqueryStatsProvider,
!dryRun,
emitter
);
} else {
// Cannot inline subquery. Attempt to inline one level deeper, and then try again.
List<DataSource> newDataSources = new ArrayList<>();
for (DataSource ds : dataSource.getChildren()) {
newDataSources.add(
inlineIfNecessary(
ds,
null,
subqueryRowLimitAccumulator,
subqueryMemoryLimitAccumulator,
cannotMaterializeToFrames,
maxSubqueryRows,
maxSubqueryMemory,
useNestedForUnknownTypeInSubquery,
dryRun
)
);
}
return inlineIfNecessary(
dataSource.withChildren(
newDataSources
),
toolChestIfOutermost,
subqueryRowLimitAccumulator,
subqueryMemoryLimitAccumulator,
cannotMaterializeToFrames,
maxSubqueryRows,
maxSubqueryMemory,
useNestedForUnknownTypeInSubquery,
dryRun
);
}
} else {
// Not a query datasource. Walk children and see if there's anything to inline.
return dataSource.withChildren(
dataSource.getChildren()
.stream()
.map(child -> inlineIfNecessary(
child,
null,
subqueryRowLimitAccumulator,
subqueryMemoryLimitAccumulator,
cannotMaterializeToFrames,
maxSubqueryRows,
maxSubqueryMemory,
useNestedForUnknownTypeInSubquery,
dryRun
))
.collect(Collectors.toList())
);
}
}