in fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java [1243:1378]
private void assignRuntimeFilters(PlannerContext ctx, ScanNode scanNode) {
if (!(scanNode instanceof HdfsScanNode || scanNode instanceof KuduScanNode)) return;
TupleId tid = scanNode.getTupleIds().get(0);
if (!runtimeFiltersByTid_.containsKey(tid)) return;
Analyzer analyzer = ctx.getRootAnalyzer();
boolean disableRowRuntimeFiltering =
ctx.getQueryOptions().isDisable_row_runtime_filtering();
boolean enable_overlap_filter = enableOverlapFilter(ctx.getQueryOptions());
TRuntimeFilterMode runtimeFilterMode = ctx.getQueryOptions().getRuntime_filter_mode();
Set<TRuntimeFilterType> enabledRuntimeFilterTypes =
ctx.getQueryOptions().getEnabled_runtime_filter_types();
// Init the overlap predicate for the hdfs scan node.
if (scanNode instanceof HdfsScanNode && enable_overlap_filter) {
((HdfsScanNode) scanNode).initOverlapPredicate(analyzer);
}
for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) {
if (filter.isFinalized()) continue;
Expr targetExpr = computeTargetExpr(filter, tid, analyzer);
if (targetExpr == null) continue;
boolean isBoundByPartitionColumns = isBoundByPartitionColumns(analyzer, targetExpr,
scanNode);
if (disableRowRuntimeFiltering && !isBoundByPartitionColumns) continue;
boolean isColumnInDataFile = isColumnInDataFile(scanNode.getTupleDesc().getTable(),
isBoundByPartitionColumns);
boolean isLocalTarget = isLocalTarget(filter, scanNode);
if (runtimeFilterMode == TRuntimeFilterMode.LOCAL && !isLocalTarget) continue;
// Check that the scan node supports applying filters of this type and targetExpr.
if (scanNode instanceof HdfsScanNode) {
if (filter.isTimestampTruncation()) {
continue;
}
if (filter.getType() == TRuntimeFilterType.MIN_MAX) {
Preconditions.checkState(
enabledRuntimeFilterTypes.contains(TRuntimeFilterType.MIN_MAX),
"MIN_MAX filters should not be generated");
if (!enable_overlap_filter) continue;
// Try to compute an overlap predicate for the filter. This predicate will be
// used to filter out partitions, or row groups, pages or rows in Parquet data
// files.
if (!((HdfsScanNode) scanNode)
.tryToComputeOverlapPredicate(
analyzer, filter, targetExpr, isBoundByPartitionColumns)) {
continue;
}
} else if (filter.getType() == TRuntimeFilterType.IN_LIST) {
// Only assign IN-list filters on ORC tables.
if (!((HdfsScanNode) scanNode).getFileFormats().contains(HdfsFileFormat.ORC)) {
continue;
}
}
} else {
// assign filters to KuduScanNode
if (filter.getType() == TRuntimeFilterType.BLOOM) {
Preconditions.checkState(
enabledRuntimeFilterTypes.contains(TRuntimeFilterType.BLOOM),
"BLOOM filters should not be generated!");
// TODO: Support Kudu VARCHAR Bloom Filter
if (targetExpr.getType().isVarchar()) continue;
// Kudu only supports targeting a single column, not general exprs, so the
// target must be a SlotRef pointing to a column without casting.
// For timestamp bloom filter, assign it to Kudu if it has src timestamp
// truncation.
if (!(targetExpr instanceof SlotRef)
|| filter.getExprCompOp() == Operator.NOT_DISTINCT
|| (targetExpr.getType().isTimestamp()
&& !filter.isTimestampTruncation())) {
continue;
}
SlotRef slotRef = (SlotRef) targetExpr;
if (slotRef.getDesc().getColumn() == null) continue;
if (filter.isTimestampTruncation() &&
analyzer.getQueryOptions().isConvert_kudu_utc_timestamps() &&
analyzer.getQueryOptions().isDisable_kudu_local_timestamp_bloom_filter()) {
// Local timestamp convert to UTC could be ambiguous in the case of DST
// change. We can only put one of the two possible UTC timestamps in the bloom
// filter for now, which may cause missing rows that have the other UTC
// timestamp.
// For those regions that do not observe DST, could set this flag to false
// to re-enable kudu local timestamp bloom filter.
LOG.info("Skipping runtime filter because kudu local timestamp bloom filter "
+ "is disabled: " + filter.getSrcExpr().toSql());
continue;
}
} else if (filter.getType() == TRuntimeFilterType.MIN_MAX) {
Preconditions.checkState(
enabledRuntimeFilterTypes.contains(TRuntimeFilterType.MIN_MAX),
"MIN_MAX filters should not be generated!");
// TODO: IMPALA-9580: Support Kudu VARCHAR Min/Max Filters
if (targetExpr.getType().isVarchar()) continue;
SlotRef slotRef = targetExpr.unwrapSlotRef(true);
// Kudu only supports targeting a single column, not general exprs, so the
// target must be a SlotRef pointing to a column. We can allow implicit
// integer casts by casting the min/max values before sending them to Kudu.
// Kudu also cannot currently return nulls if a filter is applied, so it
// does not work with "is not distinct".
if (slotRef == null || slotRef.getDesc().getColumn() == null
|| (targetExpr instanceof CastExpr && !targetExpr.getType().isIntegerType())
|| filter.getExprCompOp() == Operator.NOT_DISTINCT) {
continue;
}
} else {
Preconditions.checkState(filter.getType() == TRuntimeFilterType.IN_LIST);
Preconditions.checkState(
enabledRuntimeFilterTypes.contains(TRuntimeFilterType.IN_LIST),
"IN_LIST filters should not be generated!");
// TODO(IMPALA-11066): Runtime IN-list filters for Kudu tables
continue;
}
}
TColumnValue lowValue = null;
TColumnValue highValue = null;
if (scanNode instanceof HdfsScanNode) {
SlotRef slotRefInScan = targetExpr.unwrapSlotRef(true);
if (slotRefInScan != null) {
Column col = slotRefInScan.getDesc().getColumn();
if (col != null) {
lowValue = col.getStats().getLowValue();
highValue = col.getStats().getHighValue();
}
}
}
RuntimeFilter.RuntimeFilterTarget target =
new RuntimeFilter.RuntimeFilterTarget(scanNode, targetExpr,
isBoundByPartitionColumns, isColumnInDataFile, isLocalTarget,
lowValue, highValue);
filter.addTarget(target);
}
// finalize the overlap predicate for the hdfs scan node.
if (scanNode instanceof HdfsScanNode && enable_overlap_filter) {
((HdfsScanNode) scanNode).finalizeOverlapPredicate();
}
}