protected QuerySpec getQuery()

in druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java [1001:1155]


  protected QuerySpec getQuery(RelDataType rowType, Filter filter,
      @Nullable Project project, @Nullable ImmutableBitSet groupSet,
      @Nullable List<AggregateCall> aggCalls, @Nullable List<String> aggNames,
      @Nullable List<Integer> collationIndexes,
      @Nullable List<Direction> collationDirections,
      ImmutableBitSet numericCollationIndexes, @Nullable Integer fetch,
      @Nullable Project postProject, @Nullable Filter havingFilter) {
    // Handle filter
    final @Nullable DruidJsonFilter jsonFilter = computeFilter(filter);

    if (groupSet == null) {
      // It is Scan Query since no Grouping
      assert aggCalls == null;
      assert aggNames == null;
      assert collationIndexes == null || collationIndexes.isEmpty();
      assert collationDirections == null || collationDirections.isEmpty();
      final List<String> scanColumnNames;
      final List<VirtualColumn> virtualColumnList = new ArrayList<>();
      if (project != null) {
        // project some fields only
        Pair<List<String>, List<VirtualColumn>> projectResult =
            computeProjectAsScan(project, project.getInput().getRowType(), this);
        scanColumnNames = projectResult.left;
        virtualColumnList.addAll(projectResult.right);
      } else {
        // Scan all the fields
        scanColumnNames = rowType.getFieldNames();
      }
      final ScanQuery scanQuery =
          new ScanQuery(druidTable.dataSource, intervals, jsonFilter,
              virtualColumnList, scanColumnNames, fetch);
      return new QuerySpec(QueryType.SCAN, scanQuery.toQuery(), scanColumnNames);
    }

    // At this Stage we have a valid Aggregate thus Query is one of Timeseries, TopN, or GroupBy
    // Handling aggregate and sort is more complex, since
    // we need to extract the conditions to know whether the query will be executed as a
    // Timeseries, TopN, or GroupBy in Druid
    requireNonNull(aggCalls, "aggCalls");
    requireNonNull(aggNames, "aggNames");
    checkArgument(aggCalls.size() == aggNames.size());

    final List<JsonExpressionPostAgg> postAggs = new ArrayList<>();
    final JsonLimit limit;
    final RelDataType aggInputRowType = table.getRowType();
    final List<String> aggregateStageFieldNames = new ArrayList<>();

    Pair<List<DimensionSpec>, List<VirtualColumn>> projectGroupSet =
        computeProjectGroupSet(project, groupSet, aggInputRowType, this);
    requireNonNull(projectGroupSet, "projectGroupSet");

    final List<DimensionSpec> groupByKeyDims = projectGroupSet.left;
    final List<VirtualColumn> virtualColumnList = projectGroupSet.right;
    for (DimensionSpec dim : groupByKeyDims) {
      aggregateStageFieldNames.add(dim.getOutputName());
    }
    final List<JsonAggregation> aggregations =
        computeDruidJsonAgg(aggCalls, aggNames, project, this);
    requireNonNull(aggregations, "aggregations");
    for (JsonAggregation jsonAgg : aggregations) {
      aggregateStageFieldNames.add(jsonAgg.name);
    }


    final @Nullable DruidJsonFilter havingJsonFilter;
    if (havingFilter != null) {
      havingJsonFilter =
          DruidJsonFilter.toDruidFilters(havingFilter.getCondition(),
              havingFilter.getInput().getRowType(), this,
              getCluster().getRexBuilder());
    } else {
      havingJsonFilter = null;
    }

    // Then we handle projects after aggregates as Druid Post Aggregates
    final List<String> postAggregateStageFieldNames;
    if (postProject != null) {
      final List<String> postProjectDimListBuilder = new ArrayList<>();
      final RelDataType postAggInputRowType = getCluster().getTypeFactory()
          .createStructType(Pair.right(postProject.getInput().getRowType().getFieldList()),
              aggregateStageFieldNames);
      final Set<String> existingAggFieldsNames = new HashSet<>(aggregateStageFieldNames);
      // this is an index of existing columns coming out aggregate layer. Will use this index to:
      // filter out any project down the road that doesn't change values e.g inputRef/identity cast
      Map<String, String> existingProjects = Maps
          .uniqueIndex(aggregateStageFieldNames, DruidExpressions::fromColumn);
      for (Pair<RexNode, String> pair : postProject.getNamedProjects()) {
        final RexNode postProjectRexNode = pair.left;
        String expression = DruidExpressions
              .toDruidExpression(postProjectRexNode, postAggInputRowType, this);
        final String existingFieldName = existingProjects.get(expression);
        if (existingFieldName != null) {
          // simple input ref or Druid runtime identity cast will skip it, since it is here already
          postProjectDimListBuilder.add(existingFieldName);
        } else {
          final String uniquelyProjectFieldName =
              SqlValidatorUtil.uniquify(pair.right,
                  existingAggFieldsNames, SqlValidatorUtil.EXPR_SUGGESTER);
          postAggs.add(new JsonExpressionPostAgg(uniquelyProjectFieldName, expression, null));
          postProjectDimListBuilder.add(uniquelyProjectFieldName);
          existingAggFieldsNames.add(uniquelyProjectFieldName);
        }
      }
      postAggregateStageFieldNames = postProjectDimListBuilder;
    } else {
      postAggregateStageFieldNames = null;
    }

    // final Query output row field names.
    final List<String> queryOutputFieldNames = postAggregateStageFieldNames == null
        ? aggregateStageFieldNames
        : postAggregateStageFieldNames;

    // handle sort all together
    limit =
        computeSort(fetch, collationIndexes, collationDirections,
            numericCollationIndexes, queryOutputFieldNames);

    final String timeSeriesQueryString =
        planAsTimeSeries(groupByKeyDims, jsonFilter,
            virtualColumnList, aggregations, postAggs, limit, havingJsonFilter);
    if (timeSeriesQueryString != null) {
      final String timeExtractColumn = groupByKeyDims.isEmpty()
          ? null
          : groupByKeyDims.get(0).getOutputName();
      if (timeExtractColumn != null) {
        // Case we have transformed the group by time to druid timeseries with Granularity.
        // Need to replace the name of the column with druid timestamp field name.
        final List<String> timeseriesFieldNames =
            Util.transform(queryOutputFieldNames, input -> {
              if (timeExtractColumn.equals(input)) {
                return "timestamp";
              }
              return input;
            });
        return new QuerySpec(QueryType.TIMESERIES, timeSeriesQueryString, timeseriesFieldNames);
      }
      return new QuerySpec(QueryType.TIMESERIES, timeSeriesQueryString, queryOutputFieldNames);
    }
    final String topNQuery =
        planAsTopN(groupByKeyDims, jsonFilter,
            virtualColumnList, aggregations, postAggs, limit, havingJsonFilter);
    if (topNQuery != null) {
      return new QuerySpec(QueryType.TOP_N, topNQuery, queryOutputFieldNames);
    }

    final String groupByQuery =
        planAsGroupBy(groupByKeyDims, jsonFilter,
            virtualColumnList, aggregations, postAggs, limit, havingJsonFilter);

    if (groupByQuery == null) {
      throw new IllegalStateException("Cannot plan Druid Query");
    }
    return new QuerySpec(QueryType.GROUP_BY, groupByQuery, queryOutputFieldNames);
  }