private void assignRuntimeFilters()

in fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java [1243:1378]


  private void assignRuntimeFilters(PlannerContext ctx, ScanNode scanNode) {
    if (!(scanNode instanceof HdfsScanNode || scanNode instanceof KuduScanNode)) return;
    TupleId tid = scanNode.getTupleIds().get(0);
    if (!runtimeFiltersByTid_.containsKey(tid)) return;
    Analyzer analyzer = ctx.getRootAnalyzer();
    boolean disableRowRuntimeFiltering =
        ctx.getQueryOptions().isDisable_row_runtime_filtering();
    boolean enable_overlap_filter = enableOverlapFilter(ctx.getQueryOptions());
    TRuntimeFilterMode runtimeFilterMode = ctx.getQueryOptions().getRuntime_filter_mode();
    Set<TRuntimeFilterType> enabledRuntimeFilterTypes =
        ctx.getQueryOptions().getEnabled_runtime_filter_types();

    // Init the overlap predicate for the hdfs scan node.
    if (scanNode instanceof HdfsScanNode && enable_overlap_filter) {
      ((HdfsScanNode) scanNode).initOverlapPredicate(analyzer);
    }

    for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) {
      if (filter.isFinalized()) continue;
      Expr targetExpr = computeTargetExpr(filter, tid, analyzer);
      if (targetExpr == null) continue;
      boolean isBoundByPartitionColumns = isBoundByPartitionColumns(analyzer, targetExpr,
          scanNode);
      if (disableRowRuntimeFiltering && !isBoundByPartitionColumns) continue;
      boolean isColumnInDataFile = isColumnInDataFile(scanNode.getTupleDesc().getTable(),
          isBoundByPartitionColumns);
      boolean isLocalTarget = isLocalTarget(filter, scanNode);
      if (runtimeFilterMode == TRuntimeFilterMode.LOCAL && !isLocalTarget) continue;

      // Check that the scan node supports applying filters of this type and targetExpr.
      if (scanNode instanceof HdfsScanNode) {
        if (filter.isTimestampTruncation()) {
          continue;
        }
        if (filter.getType() == TRuntimeFilterType.MIN_MAX) {
          Preconditions.checkState(
              enabledRuntimeFilterTypes.contains(TRuntimeFilterType.MIN_MAX),
              "MIN_MAX filters should not be generated");
          if (!enable_overlap_filter) continue;
          // Try to compute an overlap predicate for the filter. This predicate will be
          // used to filter out partitions, or row groups, pages or rows in Parquet data
          // files.
          if (!((HdfsScanNode) scanNode)
                   .tryToComputeOverlapPredicate(
                       analyzer, filter, targetExpr, isBoundByPartitionColumns)) {
            continue;
          }
        } else if (filter.getType() == TRuntimeFilterType.IN_LIST) {
          // Only assign IN-list filters on ORC tables.
          if (!((HdfsScanNode) scanNode).getFileFormats().contains(HdfsFileFormat.ORC)) {
            continue;
          }
        }
      } else {
        // assign filters to KuduScanNode
        if (filter.getType() == TRuntimeFilterType.BLOOM) {
          Preconditions.checkState(
              enabledRuntimeFilterTypes.contains(TRuntimeFilterType.BLOOM),
              "BLOOM filters should not be generated!");
          // TODO: Support Kudu VARCHAR Bloom Filter
          if (targetExpr.getType().isVarchar()) continue;
          // Kudu only supports targeting a single column, not general exprs, so the
          // target must be a SlotRef pointing to a column without casting.
          // For timestamp bloom filter, assign it to Kudu if it has src timestamp
          // truncation.
          if (!(targetExpr instanceof SlotRef)
              || filter.getExprCompOp() == Operator.NOT_DISTINCT
              || (targetExpr.getType().isTimestamp()
                     && !filter.isTimestampTruncation())) {
            continue;
          }
          SlotRef slotRef = (SlotRef) targetExpr;
          if (slotRef.getDesc().getColumn() == null) continue;
          if (filter.isTimestampTruncation() &&
              analyzer.getQueryOptions().isConvert_kudu_utc_timestamps() &&
              analyzer.getQueryOptions().isDisable_kudu_local_timestamp_bloom_filter()) {
            // Local timestamp convert to UTC could be ambiguous in the case of DST
            // change. We can only put one of the two possible UTC timestamps in the bloom
            // filter for now, which may cause missing rows that have the other UTC
            // timestamp.
            // For those regions that do not observe DST, could set this flag to false
            // to re-enable kudu local timestamp bloom filter.
            LOG.info("Skipping runtime filter because kudu local timestamp bloom filter "
                + "is disabled: " + filter.getSrcExpr().toSql());
            continue;
          }
        } else if (filter.getType() == TRuntimeFilterType.MIN_MAX) {
          Preconditions.checkState(
              enabledRuntimeFilterTypes.contains(TRuntimeFilterType.MIN_MAX),
              "MIN_MAX filters should not be generated!");
          // TODO: IMPALA-9580: Support Kudu VARCHAR Min/Max Filters
          if (targetExpr.getType().isVarchar()) continue;
          SlotRef slotRef = targetExpr.unwrapSlotRef(true);
          // Kudu only supports targeting a single column, not general exprs, so the
          // target must be a SlotRef pointing to a column. We can allow implicit
          // integer casts by casting the min/max values before sending them to Kudu.
          // Kudu also cannot currently return nulls if a filter is applied, so it
          // does not work with "is not distinct".
          if (slotRef == null || slotRef.getDesc().getColumn() == null
              || (targetExpr instanceof CastExpr && !targetExpr.getType().isIntegerType())
              || filter.getExprCompOp() == Operator.NOT_DISTINCT) {
            continue;
          }
        } else {
          Preconditions.checkState(filter.getType() == TRuntimeFilterType.IN_LIST);
          Preconditions.checkState(
              enabledRuntimeFilterTypes.contains(TRuntimeFilterType.IN_LIST),
              "IN_LIST filters should not be generated!");
          // TODO(IMPALA-11066): Runtime IN-list filters for Kudu tables
          continue;
        }
      }
      TColumnValue lowValue = null;
      TColumnValue highValue = null;
      if (scanNode instanceof HdfsScanNode) {
        SlotRef slotRefInScan = targetExpr.unwrapSlotRef(true);
        if (slotRefInScan != null) {
          Column col = slotRefInScan.getDesc().getColumn();
          if (col != null) {
            lowValue = col.getStats().getLowValue();
            highValue = col.getStats().getHighValue();
          }
        }
      }
      RuntimeFilter.RuntimeFilterTarget target =
          new RuntimeFilter.RuntimeFilterTarget(scanNode, targetExpr,
              isBoundByPartitionColumns, isColumnInDataFile, isLocalTarget,
              lowValue, highValue);
      filter.addTarget(target);
    }

    // finalize the overlap predicate for the hdfs scan node.
    if (scanNode instanceof HdfsScanNode && enable_overlap_filter) {
      ((HdfsScanNode) scanNode).finalizeOverlapPredicate();
    }
  }