private DataSource inlineIfNecessary()

in server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java [360:551]


  private <T> DataSource inlineIfNecessary(
      final DataSource dataSource,
      @Nullable final QueryToolChest toolChestIfOutermost,
      final AtomicInteger subqueryRowLimitAccumulator,
      final AtomicLong subqueryMemoryLimitAccumulator,
      final AtomicBoolean cannotMaterializeToFrames,
      final int maxSubqueryRows,
      final long maxSubqueryMemory,
      final boolean useNestedForUnknownTypeInSubquery,
      final boolean dryRun
  )
  {
    if (dataSource instanceof QueryDataSource) {
      // This datasource is a subquery.
      final Query subQuery = ((QueryDataSource) dataSource).getQuery();
      final QueryToolChest toolChest = conglomerate.getToolChest(subQuery);
      final QueryLogic subQueryLogic = conglomerate.getQueryLogic(subQuery);

      if (subQueryLogic != null) {
        final Sequence<?> queryResults;

        if (dryRun) {
          queryResults = Sequences.empty();
        } else {
          Query subQueryWithSerialization = subQuery.withOverriddenContext(
              Collections.singletonMap(
                  ResultSerializationMode.CTX_SERIALIZATION_PARAMETER,
                  ClientQuerySegmentWalkerUtils.getLimitType(maxSubqueryMemory, cannotMaterializeToFrames.get())
                      .serializationMode()
                      .toString()
              )
          );
          queryResults = subQueryLogic
              .entryPoint(subQueryWithSerialization, this)
              .run(QueryPlus.wrap(subQueryWithSerialization), DirectDruidClient.makeResponseContextForQuery());
        }

        return toInlineDataSource(
            subQuery,
            queryResults,
            (QueryToolChest) new QueryLogicCompatToolChest(subQuery.getResultRowSignature()),
            subqueryRowLimitAccumulator,
            subqueryMemoryLimitAccumulator,
            cannotMaterializeToFrames,
            maxSubqueryRows,
            maxSubqueryMemory,
            useNestedForUnknownTypeInSubquery,
            subqueryStatsProvider,
            !dryRun,
            emitter
        );
      }

      if (toolChestIfOutermost != null && toolChestIfOutermost.canPerformSubquery(subQuery)) {
        // Strip outer queries that are handleable by the toolchest, and inline subqueries that may be underneath
        // them (e.g. subqueries nested under a join).
        final Stack<DataSource> stack = new Stack<>();

        DataSource current = dataSource;
        while (current instanceof QueryDataSource) {
          stack.push(current);
          current = Iterables.getOnlyElement(current.getChildren());
        }

        if (current instanceof QueryDataSource) {
          throw new ISE("Got a QueryDataSource[%s], should've walked it away in the loop above.", current);
        }
        current = inlineIfNecessary(
            current,
            null,
            subqueryRowLimitAccumulator,
            subqueryMemoryLimitAccumulator,
            cannotMaterializeToFrames,
            maxSubqueryRows,
            maxSubqueryMemory,
            useNestedForUnknownTypeInSubquery,
            dryRun
        );

        while (!stack.isEmpty()) {
          current = stack.pop().withChildren(Collections.singletonList(current));
        }

        if (!(current instanceof QueryDataSource)) {
          throw new ISE("Should have a QueryDataSource, but got[%s] instead", current);
        }
        if (toolChest.canPerformSubquery(((QueryDataSource) current).getQuery())) {
          return current;
        } else {
          // Something happened during inlining that means the toolchest is no longer able to handle this subquery.
          // We need to consider inlining it.
          return inlineIfNecessary(
              current,
              toolChestIfOutermost,
              subqueryRowLimitAccumulator,
              subqueryMemoryLimitAccumulator,
              cannotMaterializeToFrames,
              maxSubqueryRows,
              maxSubqueryMemory,
              useNestedForUnknownTypeInSubquery,
              dryRun
          );
        }
      } else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) {
        // Subquery needs to be inlined. Assign it a subquery id and run it.

        final Sequence<?> queryResults;

        if (dryRun) {
          queryResults = Sequences.empty();
        } else {
          Query subQueryWithSerialization = subQuery.withOverriddenContext(
              Collections.singletonMap(
                  ResultSerializationMode.CTX_SERIALIZATION_PARAMETER,
                  ClientQuerySegmentWalkerUtils.getLimitType(maxSubqueryMemory, cannotMaterializeToFrames.get())
                                               .serializationMode()
                                               .toString()
              )
          );

          queryResults = subQueryWithSerialization
              .getRunner(this)
              .run(QueryPlus.wrap(subQueryWithSerialization), DirectDruidClient.makeResponseContextForQuery());
        }

        return toInlineDataSource(
            subQuery,
            queryResults,
            toolChest,
            subqueryRowLimitAccumulator,
            subqueryMemoryLimitAccumulator,
            cannotMaterializeToFrames,
            maxSubqueryRows,
            maxSubqueryMemory,
            useNestedForUnknownTypeInSubquery,
            subqueryStatsProvider,
            !dryRun,
            emitter
        );
      } else {
        // Cannot inline subquery. Attempt to inline one level deeper, and then try again.

        List<DataSource> newDataSources = new ArrayList<>();
        for (DataSource ds : dataSource.getChildren()) {
          newDataSources.add(
              inlineIfNecessary(
                  ds,
                  null,
                  subqueryRowLimitAccumulator,
                  subqueryMemoryLimitAccumulator,
                  cannotMaterializeToFrames,
                  maxSubqueryRows,
                  maxSubqueryMemory,
                  useNestedForUnknownTypeInSubquery,
                  dryRun
              )
          );
        }
        return inlineIfNecessary(
            dataSource.withChildren(
                newDataSources
            ),
            toolChestIfOutermost,
            subqueryRowLimitAccumulator,
            subqueryMemoryLimitAccumulator,
            cannotMaterializeToFrames,
            maxSubqueryRows,
            maxSubqueryMemory,
            useNestedForUnknownTypeInSubquery,
            dryRun
        );
      }
    } else {
      // Not a query datasource. Walk children and see if there's anything to inline.
      return dataSource.withChildren(
          dataSource.getChildren()
                    .stream()
                    .map(child -> inlineIfNecessary(
                        child,
                        null,
                        subqueryRowLimitAccumulator,
                        subqueryMemoryLimitAccumulator,
                        cannotMaterializeToFrames,
                        maxSubqueryRows,
                        maxSubqueryMemory,
                        useNestedForUnknownTypeInSubquery,
                        dryRun
                    ))
                    .collect(Collectors.toList())
      );
    }
  }