public QueryRunner getQueryRunnerForSegments()

in server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java [182:402]


  public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
  {
    ExecutionVertex ev = ExecutionVertex.of(query);
    // We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out.
    final DataSource dataSourceFromQuery = query.getDataSource();

    // Sanity check: make sure the query is based on the table we're meant to handle.
    if (!ev.getBaseTableDataSource().getName().equals(dataSource)) {
      throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
    }

    final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
    if (factory == null) {
      throw new ISE("Unknown query type[%s].", query.getClass());
    }

    final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
    final boolean skipIncrementalSegment = query.context().getBoolean(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
    final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);

    // Make sure this query type can handle the subquery, if present.
    if ((dataSourceFromQuery instanceof QueryDataSource)
        && !toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery())) {
      throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
    }

    // segmentMapFn maps each base Segment into a joined Segment if necessary.
    final Function<SegmentReference, SegmentReference> segmentMapFn = JvmUtils.safeAccumulateThreadCpuTime(
        cpuTimeAccumulator,
        () -> ev.createSegmentMapFunction(policyEnforcer)
    );

    // We compute the join cache key here itself so it doesn't need to be re-computed for every segment
    final Optional<byte[]> cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey());

    // We need to report data for each Sink all-or-nothing, which means we need to acquire references for all
    // subsegments (FireHydrants) of a segment (Sink) at once. To ensure they are properly released even when a
    // query fails or is canceled, we acquire *all* sink reference upfront, and release them all when the main
    // QueryRunner returned by this method is closed. (We can't do the acquisition and releasing at the level of
    // each FireHydrant's runner, since then it wouldn't be properly all-or-nothing on a per-Sink basis.)
    final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
    final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
    final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = new LinkedHashMap<>();
    final ConcurrentHashMap<String, SinkMetricsEmittingQueryRunner.SegmentMetrics> segmentMetricsAccumulator = new ConcurrentHashMap<>();

    try {
      for (final SegmentDescriptor descriptor : specs) {
        final PartitionChunk<SinkHolder> chunk = upgradedSegmentsTimeline.findChunk(
            descriptor.getInterval(),
            descriptor.getVersion(),
            descriptor.getPartitionNumber()
        );

        if (chunk == null) {
          allRunners.put(
              descriptor,
              Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor))
          );
          continue;
        }

        final Sink theSink = chunk.getObject().sink;
        final SegmentId sinkSegmentId = theSink.getSegment().getId();
        segmentIdMap.put(descriptor, sinkSegmentId);
        final List<SinkSegmentReference> sinkSegmentReferences =
            theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment);

        if (sinkSegmentReferences == null) {
          // We failed to acquire references for all subsegments. Bail and report the entire sink missing.
          allRunners.put(
              descriptor,
              Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor))
          );
        } else if (sinkSegmentReferences.isEmpty()) {
          allRunners.put(descriptor, Collections.singletonList(new NoopQueryRunner<>()));
        } else {
          allSegmentReferences.addAll(sinkSegmentReferences);

          allRunners.put(
              descriptor,
              sinkSegmentReferences.stream().map(
                  segmentReference -> {
                    QueryRunner<T> runner = new SinkMetricsEmittingQueryRunner<>(
                        emitter,
                        factory.getToolchest(),
                        factory.createRunner(segmentReference.getSegment()),
                        segmentMetricsAccumulator,
                        SEGMENT_QUERY_METRIC,
                        sinkSegmentId.toString()
                    );

                    // 1) Only use caching if data is immutable
                    // 2) Hydrants are not the same between replicas, make sure cache is local
                    if (segmentReference.isImmutable() && cache.isLocal()) {
                      final SegmentReference segment = segmentReference.getSegment();
                      final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class);
                      final Interval cacheKeyInterval;

                      if (timeBoundaryInspector != null) {
                        cacheKeyInterval = timeBoundaryInspector.getMinMaxInterval();
                      } else {
                        cacheKeyInterval = segment.getDataInterval();
                      }

                      runner = new CachingQueryRunner<>(
                          makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()),
                          cacheKeyPrefix,
                          descriptor,
                          cacheKeyInterval,
                          objectMapper,
                          cache,
                          toolChest,
                          runner,
                          // Always populate in foreground regardless of config
                          new ForegroundCachePopulator(
                              objectMapper,
                              cachePopulatorStats,
                              cacheConfig.getMaxEntrySize()
                          ),
                          cacheConfig
                      );
                    }

                    // Regardless of whether caching is enabled, do reportSegmentAndCacheTime outside the
                    // *possible* caching.
                    runner = new SinkMetricsEmittingQueryRunner<>(
                        emitter,
                        factory.getToolchest(),
                        runner,
                        segmentMetricsAccumulator,
                        SEGMENT_CACHE_AND_WAIT_METRICS,
                        sinkSegmentId.toString()
                    );

                    // Emit CPU time metrics.
                    runner = CPUTimeMetricQueryRunner.safeBuild(
                        runner,
                        toolChest,
                        emitter,
                        cpuTimeAccumulator,
                        false
                    );

                    // Run with specific segment descriptor.
                    runner = new SpecificSegmentQueryRunner<>(
                        runner,
                        new SpecificSegmentSpec(descriptor)
                    );

                    return runner;
                  }
              ).collect(Collectors.toList())
          );
        }
      }

      final QueryRunner<T> mergedRunner;

      if (query.context().isBySegment()) {
        // bySegment: merge all hydrants for a Sink first, then merge Sinks. Necessary to keep results for the
        // same segment together, but causes additional memory usage due to the extra layer of materialization,
        // so we only do this if we need to.
        mergedRunner = factory.mergeRunners(
            queryProcessingPool,
            allRunners.entrySet().stream().map(
                entry -> new BySegmentQueryRunner<>(
                    segmentIdMap.get(entry.getKey()),
                    entry.getKey().getInterval().getStart(),
                    factory.mergeRunners(
                        DirectQueryProcessingPool.INSTANCE,
                        entry.getValue()
                    )
                )
            ).collect(Collectors.toList())
        );
      } else {
        // Not bySegment: merge all hydrants at the same level, rather than grouped by Sink (segment).
        mergedRunner = factory.mergeRunners(
            queryProcessingPool,
            new SinkQueryRunners<>(
                allRunners.entrySet().stream().flatMap(
                    entry ->
                        entry.getValue().stream().map(
                            runner ->
                                Pair.of(entry.getKey().getInterval(), runner)
                        )
                ).collect(Collectors.toList()))
        );
      }

      // 1) Populate resource id to the query
      // 2) Merge results using the toolChest, finalize if necessary.
      // 3) Measure CPU time of that operation.
      // 4) Release all sink segment references.
      return new ResourceIdPopulatingQueryRunner<>(
          QueryRunnerHelper.makeClosingQueryRunner(
              CPUTimeMetricQueryRunner.safeBuild(
                  new SinkMetricsEmittingQueryRunner<>(
                      emitter,
                      toolChest,
                      new FinalizeResultsQueryRunner<>(
                          toolChest.mergeResults(mergedRunner, true),
                          toolChest
                      ),
                      segmentMetricsAccumulator,
                      Collections.emptySet(),
                      null
                  ),
                  toolChest,
                  emitter,
                  cpuTimeAccumulator,
                  true
              ),
              () -> CloseableUtils.closeAll(allSegmentReferences)
          )
      );
    }
    catch (Throwable e) {
      throw CloseableUtils.closeAndWrapInCatch(e, () -> CloseableUtils.closeAll(allSegmentReferences));
    }
  }