in druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java [1001:1155]
protected QuerySpec getQuery(RelDataType rowType, Filter filter,
@Nullable Project project, @Nullable ImmutableBitSet groupSet,
@Nullable List<AggregateCall> aggCalls, @Nullable List<String> aggNames,
@Nullable List<Integer> collationIndexes,
@Nullable List<Direction> collationDirections,
ImmutableBitSet numericCollationIndexes, @Nullable Integer fetch,
@Nullable Project postProject, @Nullable Filter havingFilter) {
// Handle filter
final @Nullable DruidJsonFilter jsonFilter = computeFilter(filter);
if (groupSet == null) {
// It is Scan Query since no Grouping
assert aggCalls == null;
assert aggNames == null;
assert collationIndexes == null || collationIndexes.isEmpty();
assert collationDirections == null || collationDirections.isEmpty();
final List<String> scanColumnNames;
final List<VirtualColumn> virtualColumnList = new ArrayList<>();
if (project != null) {
// project some fields only
Pair<List<String>, List<VirtualColumn>> projectResult =
computeProjectAsScan(project, project.getInput().getRowType(), this);
scanColumnNames = projectResult.left;
virtualColumnList.addAll(projectResult.right);
} else {
// Scan all the fields
scanColumnNames = rowType.getFieldNames();
}
final ScanQuery scanQuery =
new ScanQuery(druidTable.dataSource, intervals, jsonFilter,
virtualColumnList, scanColumnNames, fetch);
return new QuerySpec(QueryType.SCAN, scanQuery.toQuery(), scanColumnNames);
}
// At this Stage we have a valid Aggregate thus Query is one of Timeseries, TopN, or GroupBy
// Handling aggregate and sort is more complex, since
// we need to extract the conditions to know whether the query will be executed as a
// Timeseries, TopN, or GroupBy in Druid
requireNonNull(aggCalls, "aggCalls");
requireNonNull(aggNames, "aggNames");
checkArgument(aggCalls.size() == aggNames.size());
final List<JsonExpressionPostAgg> postAggs = new ArrayList<>();
final JsonLimit limit;
final RelDataType aggInputRowType = table.getRowType();
final List<String> aggregateStageFieldNames = new ArrayList<>();
Pair<List<DimensionSpec>, List<VirtualColumn>> projectGroupSet =
computeProjectGroupSet(project, groupSet, aggInputRowType, this);
requireNonNull(projectGroupSet, "projectGroupSet");
final List<DimensionSpec> groupByKeyDims = projectGroupSet.left;
final List<VirtualColumn> virtualColumnList = projectGroupSet.right;
for (DimensionSpec dim : groupByKeyDims) {
aggregateStageFieldNames.add(dim.getOutputName());
}
final List<JsonAggregation> aggregations =
computeDruidJsonAgg(aggCalls, aggNames, project, this);
requireNonNull(aggregations, "aggregations");
for (JsonAggregation jsonAgg : aggregations) {
aggregateStageFieldNames.add(jsonAgg.name);
}
final @Nullable DruidJsonFilter havingJsonFilter;
if (havingFilter != null) {
havingJsonFilter =
DruidJsonFilter.toDruidFilters(havingFilter.getCondition(),
havingFilter.getInput().getRowType(), this,
getCluster().getRexBuilder());
} else {
havingJsonFilter = null;
}
// Then we handle projects after aggregates as Druid Post Aggregates
final List<String> postAggregateStageFieldNames;
if (postProject != null) {
final List<String> postProjectDimListBuilder = new ArrayList<>();
final RelDataType postAggInputRowType = getCluster().getTypeFactory()
.createStructType(Pair.right(postProject.getInput().getRowType().getFieldList()),
aggregateStageFieldNames);
final Set<String> existingAggFieldsNames = new HashSet<>(aggregateStageFieldNames);
// this is an index of existing columns coming out aggregate layer. Will use this index to:
// filter out any project down the road that doesn't change values e.g inputRef/identity cast
Map<String, String> existingProjects = Maps
.uniqueIndex(aggregateStageFieldNames, DruidExpressions::fromColumn);
for (Pair<RexNode, String> pair : postProject.getNamedProjects()) {
final RexNode postProjectRexNode = pair.left;
String expression = DruidExpressions
.toDruidExpression(postProjectRexNode, postAggInputRowType, this);
final String existingFieldName = existingProjects.get(expression);
if (existingFieldName != null) {
// simple input ref or Druid runtime identity cast will skip it, since it is here already
postProjectDimListBuilder.add(existingFieldName);
} else {
final String uniquelyProjectFieldName =
SqlValidatorUtil.uniquify(pair.right,
existingAggFieldsNames, SqlValidatorUtil.EXPR_SUGGESTER);
postAggs.add(new JsonExpressionPostAgg(uniquelyProjectFieldName, expression, null));
postProjectDimListBuilder.add(uniquelyProjectFieldName);
existingAggFieldsNames.add(uniquelyProjectFieldName);
}
}
postAggregateStageFieldNames = postProjectDimListBuilder;
} else {
postAggregateStageFieldNames = null;
}
// final Query output row field names.
final List<String> queryOutputFieldNames = postAggregateStageFieldNames == null
? aggregateStageFieldNames
: postAggregateStageFieldNames;
// handle sort all together
limit =
computeSort(fetch, collationIndexes, collationDirections,
numericCollationIndexes, queryOutputFieldNames);
final String timeSeriesQueryString =
planAsTimeSeries(groupByKeyDims, jsonFilter,
virtualColumnList, aggregations, postAggs, limit, havingJsonFilter);
if (timeSeriesQueryString != null) {
final String timeExtractColumn = groupByKeyDims.isEmpty()
? null
: groupByKeyDims.get(0).getOutputName();
if (timeExtractColumn != null) {
// Case we have transformed the group by time to druid timeseries with Granularity.
// Need to replace the name of the column with druid timestamp field name.
final List<String> timeseriesFieldNames =
Util.transform(queryOutputFieldNames, input -> {
if (timeExtractColumn.equals(input)) {
return "timestamp";
}
return input;
});
return new QuerySpec(QueryType.TIMESERIES, timeSeriesQueryString, timeseriesFieldNames);
}
return new QuerySpec(QueryType.TIMESERIES, timeSeriesQueryString, queryOutputFieldNames);
}
final String topNQuery =
planAsTopN(groupByKeyDims, jsonFilter,
virtualColumnList, aggregations, postAggs, limit, havingJsonFilter);
if (topNQuery != null) {
return new QuerySpec(QueryType.TOP_N, topNQuery, queryOutputFieldNames);
}
final String groupByQuery =
planAsGroupBy(groupByKeyDims, jsonFilter,
virtualColumnList, aggregations, postAggs, limit, havingJsonFilter);
if (groupByQuery == null) {
throw new IllegalStateException("Cannot plan Druid Query");
}
return new QuerySpec(QueryType.GROUP_BY, groupByQuery, queryOutputFieldNames);
}