in server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java [182:402]
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
ExecutionVertex ev = ExecutionVertex.of(query);
// We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out.
final DataSource dataSourceFromQuery = query.getDataSource();
// Sanity check: make sure the query is based on the table we're meant to handle.
if (!ev.getBaseTableDataSource().getName().equals(dataSource)) {
throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
}
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final boolean skipIncrementalSegment = query.context().getBoolean(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
// Make sure this query type can handle the subquery, if present.
if ((dataSourceFromQuery instanceof QueryDataSource)
&& !toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery())) {
throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
}
// segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<SegmentReference, SegmentReference> segmentMapFn = JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> ev.createSegmentMapFunction(policyEnforcer)
);
// We compute the join cache key here itself so it doesn't need to be re-computed for every segment
final Optional<byte[]> cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey());
// We need to report data for each Sink all-or-nothing, which means we need to acquire references for all
// subsegments (FireHydrants) of a segment (Sink) at once. To ensure they are properly released even when a
// query fails or is canceled, we acquire *all* sink reference upfront, and release them all when the main
// QueryRunner returned by this method is closed. (We can't do the acquisition and releasing at the level of
// each FireHydrant's runner, since then it wouldn't be properly all-or-nothing on a per-Sink basis.)
final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = new LinkedHashMap<>();
final ConcurrentHashMap<String, SinkMetricsEmittingQueryRunner.SegmentMetrics> segmentMetricsAccumulator = new ConcurrentHashMap<>();
try {
for (final SegmentDescriptor descriptor : specs) {
final PartitionChunk<SinkHolder> chunk = upgradedSegmentsTimeline.findChunk(
descriptor.getInterval(),
descriptor.getVersion(),
descriptor.getPartitionNumber()
);
if (chunk == null) {
allRunners.put(
descriptor,
Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor))
);
continue;
}
final Sink theSink = chunk.getObject().sink;
final SegmentId sinkSegmentId = theSink.getSegment().getId();
segmentIdMap.put(descriptor, sinkSegmentId);
final List<SinkSegmentReference> sinkSegmentReferences =
theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment);
if (sinkSegmentReferences == null) {
// We failed to acquire references for all subsegments. Bail and report the entire sink missing.
allRunners.put(
descriptor,
Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor))
);
} else if (sinkSegmentReferences.isEmpty()) {
allRunners.put(descriptor, Collections.singletonList(new NoopQueryRunner<>()));
} else {
allSegmentReferences.addAll(sinkSegmentReferences);
allRunners.put(
descriptor,
sinkSegmentReferences.stream().map(
segmentReference -> {
QueryRunner<T> runner = new SinkMetricsEmittingQueryRunner<>(
emitter,
factory.getToolchest(),
factory.createRunner(segmentReference.getSegment()),
segmentMetricsAccumulator,
SEGMENT_QUERY_METRIC,
sinkSegmentId.toString()
);
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (segmentReference.isImmutable() && cache.isLocal()) {
final SegmentReference segment = segmentReference.getSegment();
final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class);
final Interval cacheKeyInterval;
if (timeBoundaryInspector != null) {
cacheKeyInterval = timeBoundaryInspector.getMinMaxInterval();
} else {
cacheKeyInterval = segment.getDataInterval();
}
runner = new CachingQueryRunner<>(
makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()),
cacheKeyPrefix,
descriptor,
cacheKeyInterval,
objectMapper,
cache,
toolChest,
runner,
// Always populate in foreground regardless of config
new ForegroundCachePopulator(
objectMapper,
cachePopulatorStats,
cacheConfig.getMaxEntrySize()
),
cacheConfig
);
}
// Regardless of whether caching is enabled, do reportSegmentAndCacheTime outside the
// *possible* caching.
runner = new SinkMetricsEmittingQueryRunner<>(
emitter,
factory.getToolchest(),
runner,
segmentMetricsAccumulator,
SEGMENT_CACHE_AND_WAIT_METRICS,
sinkSegmentId.toString()
);
// Emit CPU time metrics.
runner = CPUTimeMetricQueryRunner.safeBuild(
runner,
toolChest,
emitter,
cpuTimeAccumulator,
false
);
// Run with specific segment descriptor.
runner = new SpecificSegmentQueryRunner<>(
runner,
new SpecificSegmentSpec(descriptor)
);
return runner;
}
).collect(Collectors.toList())
);
}
}
final QueryRunner<T> mergedRunner;
if (query.context().isBySegment()) {
// bySegment: merge all hydrants for a Sink first, then merge Sinks. Necessary to keep results for the
// same segment together, but causes additional memory usage due to the extra layer of materialization,
// so we only do this if we need to.
mergedRunner = factory.mergeRunners(
queryProcessingPool,
allRunners.entrySet().stream().map(
entry -> new BySegmentQueryRunner<>(
segmentIdMap.get(entry.getKey()),
entry.getKey().getInterval().getStart(),
factory.mergeRunners(
DirectQueryProcessingPool.INSTANCE,
entry.getValue()
)
)
).collect(Collectors.toList())
);
} else {
// Not bySegment: merge all hydrants at the same level, rather than grouped by Sink (segment).
mergedRunner = factory.mergeRunners(
queryProcessingPool,
new SinkQueryRunners<>(
allRunners.entrySet().stream().flatMap(
entry ->
entry.getValue().stream().map(
runner ->
Pair.of(entry.getKey().getInterval(), runner)
)
).collect(Collectors.toList()))
);
}
// 1) Populate resource id to the query
// 2) Merge results using the toolChest, finalize if necessary.
// 3) Measure CPU time of that operation.
// 4) Release all sink segment references.
return new ResourceIdPopulatingQueryRunner<>(
QueryRunnerHelper.makeClosingQueryRunner(
CPUTimeMetricQueryRunner.safeBuild(
new SinkMetricsEmittingQueryRunner<>(
emitter,
toolChest,
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(mergedRunner, true),
toolChest
),
segmentMetricsAccumulator,
Collections.emptySet(),
null
),
toolChest,
emitter,
cpuTimeAccumulator,
true
),
() -> CloseableUtils.closeAll(allSegmentReferences)
)
);
}
catch (Throwable e) {
throw CloseableUtils.closeAndWrapInCatch(e, () -> CloseableUtils.closeAll(allSegmentReferences));
}
}