private def tryComposeIndexFilterExpr()

in hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala [80:389]


  private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression, isExpressionIndex: Boolean = false, indexedCols : Seq[String]): Option[Expression] = {
    //
    // For translation of the Filter Expression for the Data Table into Filter Expression for Column Stats Index, we're
    // assuming that
    //    - The column A is queried in the Data Table (hereafter referred to as "colA")
    //    - Filter Expression is a relational expression (ie "=", "<", "<=", ...) of the following form
    //
    //      ```transform_expr(colA) = value_expr```
    //
    //      Where
    //        - "transform_expr" is an expression of the _transformation_ which preserve ordering of the "colA"
    //        - "value_expr" is an "value"-expression (ie one NOT referring to other attributes/columns or containing sub-queries)
    //
    // We translate original Filter Expr into the one querying Column Stats Index like following: let's consider
    // equality Filter Expr referred to above:
    //
    //   ```transform_expr(colA) = value_expr```
    //
    // This expression will be translated into following Filter Expression for the Column Stats Index:
    //
    //   ```(transform_expr(colA_minValue) <= value_expr) AND (value_expr <= transform_expr(colA_maxValue))```
    //
    // Which will enable us to match files with the range of values in column A containing the target ```value_expr```
    //
    // NOTE: That we can apply ```transform_expr``` transformation precisely b/c it preserves the ordering of the
    //       values of the source column, ie following holds true:
    //
    //       colA_minValue = min(colA)  =>  transform_expr(colA_minValue) = min(transform_expr(colA))
    //       colA_maxValue = max(colA)  =>  transform_expr(colA_maxValue) = max(transform_expr(colA))
    //
    sourceFilterExpr match {
      // If Expression is not resolved, we can't perform the analysis accurately, bailing
      case expr if !expr.resolved && !isExpressionIndex => None

      // Filter "expr(colA) = B" and "B = expr(colA)"
      // Translates to "(expr(colA_minValue) <= B) AND (B <= expr(colA_maxValue))" condition for index lookup
      case EqualTo(sourceExpr @ AllowedTransformationExpression(attrRef), valueExpr: Expression) if isValueExpression(valueExpr) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            // NOTE: Since we're supporting (almost) arbitrary expressions of the form `f(colA) = B`, we have to
            //       appropriately translate such original expression targeted at Data Table, to corresponding
            //       expression targeted at Column Stats Index Table. For that, we take original expression holding
            //       [[AttributeReference]] referring to the Data Table, and swap it w/ expression referring to
            //       corresponding column in the Column Stats Index
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            genColumnValuesEqualToExpression(colName, valueExpr, targetExprBuilder)
          }.orElse({
          Option.empty
        })

      case EqualTo(valueExpr: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(valueExpr) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            genColumnValuesEqualToExpression(colName, valueExpr, targetExprBuilder)
          }.orElse({
          Option.empty
        })

      // Filter "expr(colA) != B" and "B != expr(colA)"
      // Translates to "NOT(expr(colA_minValue) = B AND expr(colA_maxValue) = B)"
      // NOTE: This is NOT an inversion of `colA = b`, instead this filter ONLY excludes files for which `colA = B`
      //       holds true
      case Not(EqualTo(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression)) if isValueExpression(value) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            Not(genColumnOnlyValuesEqualToExpression(colName, value, targetExprBuilder))
          }.orElse({
          Option.empty
        })

      case Not(EqualTo(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef))) if isValueExpression(value) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            Not(genColumnOnlyValuesEqualToExpression(colName, value, targetExprBuilder))
          }.orElse({
          Option.empty
        })

      // Filter "colA = null"
      // Translates to "colA_nullCount = null" for index lookup
      case EqualNullSafe(attrRef: AttributeReference, litNull @ Literal(null, _)) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map(colName => EqualTo(genColNumNullsExpr(colName), litNull))
          .orElse({
            Option.empty
          })

      // Filter "expr(colA) < B" and "B > expr(colA)"
      // Translates to "expr(colA_minValue) < B" for index lookup
      case LessThan(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            LessThan(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
          }.orElse({
          Option.empty
        })

      case GreaterThan(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
              val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
              LessThan(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
          }.orElse({
          Option.empty
        })

      // Filter "B < expr(colA)" and "expr(colA) > B"
      // Translates to "B < colA_maxValue" for index lookup
      case LessThan(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            GreaterThan(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
          }.orElse({
          Option.empty
        })

      case GreaterThan(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            GreaterThan(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
          }.orElse({
          Option.empty
        })

      // Filter "expr(colA) <= B" and "B >= expr(colA)"
      // Translates to "colA_minValue <= B" for index lookup
      case LessThanOrEqual(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            LessThanOrEqual(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
          }.orElse({
          Option.empty
        })

      case GreaterThanOrEqual(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            LessThanOrEqual(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
          }.orElse({
          Option.empty
        })

      // Filter "B <= expr(colA)" and "expr(colA) >= B"
      // Translates to "B <= colA_maxValue" for index lookup
      case LessThanOrEqual(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            GreaterThanOrEqual(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
          }.orElse({
          Option.empty
        })

      case GreaterThanOrEqual(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            GreaterThanOrEqual(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
          }.orElse({
          Option.empty
        })

      // Filter "colA is null"
      // Translates to "colA_nullCount > 0" for index lookup
      case IsNull(attribute: AttributeReference) =>
        getTargetIndexedColumnName(attribute, indexedCols)
          .map(colName => GreaterThan(genColNumNullsExpr(colName), Literal(0)))
          .orElse({
            Option.empty
          })

      // Filter "colA is not null"
      // Translates to "colA_nullCount = null or colA_valueCount = null or colA_nullCount < colA_valueCount" for index lookup
      // "colA_nullCount = null or colA_valueCount = null" means we are not certain whether the column is null or not,
      // hence we return True to ensure this does not affect the query.
      case IsNotNull(attribute: AttributeReference) =>
        getTargetIndexedColumnName(attribute, indexedCols)
          .map {colName =>
            val numNullExpr = genColNumNullsExpr(colName)
            val valueCountExpr = genColValueCountExpr
            Or(Or(IsNull(numNullExpr), IsNull(valueCountExpr)), LessThan(numNullExpr, valueCountExpr))
          }.orElse({
          Option.empty
        })

      // Filter "expr(colA) in (B1, B2, ...)"
      // Translates to "(colA_minValue <= B1 AND colA_maxValue >= B1) OR (colA_minValue <= B2 AND colA_maxValue >= B2) ... "
      // for index lookup
      // NOTE: This is equivalent to "colA = B1 OR colA = B2 OR ..."
      case In(sourceExpr @ AllowedTransformationExpression(attrRef), list: Seq[Expression]) if list.forall(isValueExpression) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            list.map(lit => genColumnValuesEqualToExpression(colName, lit, targetExprBuilder)).reduce(Or)
          }.orElse({
          Option.empty
        })

      // Filter "expr(colA) in (B1, B2, ...)"
      // NOTE: [[InSet]] is an optimized version of the [[In]] expression, where every sub-expression w/in the
      //       set is a static literal
      case InSet(sourceExpr @ AllowedTransformationExpression(attrRef), hset: Set[Any]) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            hset.map { value =>
              // NOTE: [[Literal]] has a gap where it could hold [[UTF8String]], but [[Literal#apply]] doesn't
              //       accept [[UTF8String]]. As such we have to handle it separately
              val lit = value match {
                case str: UTF8String => Literal(str.toString)
                case _ => Literal(value)
              }
              genColumnValuesEqualToExpression(colName, lit, targetExprBuilder)
            }.reduce(Or)
          }.orElse({
          Option.empty
        })

      // Filter "expr(colA) not in (B1, B2, ...)"
      // Translates to "NOT((colA_minValue = B1 AND colA_maxValue = B1) OR (colA_minValue = B2 AND colA_maxValue = B2))" for index lookup
      // NOTE: This is NOT an inversion of `in (B1, B2, ...)` expr, this is equivalent to "colA != B1 AND colA != B2 AND ..."
      case Not(In(sourceExpr @ AllowedTransformationExpression(attrRef), list: Seq[Expression])) if list.forall(_.foldable) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            Not(list.map(lit => genColumnOnlyValuesEqualToExpression(colName, lit, targetExprBuilder)).reduce(Or))
          }.orElse({
          Option.empty
        })

      // Filter "colA like 'xxx%'"
      // Translates to "colA_minValue <= xxx AND xxx <= colA_maxValue" for index lookup
      //
      // NOTE: Since a) this operator matches strings by prefix and b) given that this column is going to be ordered
      //       lexicographically, we essentially need to check that provided literal falls w/in min/max bounds of the
      //       given column
      case StartsWith(sourceExpr @ AllowedTransformationExpression(attrRef), v @ Literal(_: UTF8String, _)) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            genColumnValuesEqualToExpression(colName, v, targetExprBuilder)
          }.orElse({
          Option.empty
        })

      // Filter "expr(colA) not like 'xxx%'"
      // Translates to "NOT(expr(colA_minValue) like 'xxx%' AND expr(colA_maxValue) like 'xxx%')" for index lookup
      // NOTE: This is NOT an inversion of "colA like xxx"
      case Not(StartsWith(sourceExpr @ AllowedTransformationExpression(attrRef), value @ Literal(_: UTF8String, _))) =>
        getTargetIndexedColumnName(attrRef, indexedCols)
          .map { colName =>
            val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
            val minValueExpr = targetExprBuilder.apply(genColMinValueExpr(colName))
            val maxValueExpr = targetExprBuilder.apply(genColMaxValueExpr(colName))
            Not(And(StartsWith(minValueExpr, value), StartsWith(maxValueExpr, value)))
          }.orElse({
          Option.empty
        })

      case or: Or =>
        val resLeft = createColumnStatsIndexFilterExprInternal(or.left, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols)
        val resRight = createColumnStatsIndexFilterExprInternal(or.right, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols)
        if (resLeft.equals(LITERAL_TRUE_EXPR) || resRight.equals(LITERAL_TRUE_EXPR)) {
          None
        } else {
          Option(Or(resLeft, resRight))
        }

      case and: And =>
        val resLeft = createColumnStatsIndexFilterExprInternal(and.left, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols)
        val resRight = createColumnStatsIndexFilterExprInternal(and.right, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols)
        // only if both left and right has non indexed cols, we can set hasNonIndexedCols to true.
        // If not, we can still afford to prune files based on col stats lookup.
        val isLeftLiteralTrue = resLeft.equals(LITERAL_TRUE_EXPR)
        val isRightLiteralTrue = resRight.equals(LITERAL_TRUE_EXPR)
        if (isLeftLiteralTrue && isRightLiteralTrue) {
          None
        } else if (isLeftLiteralTrue) {
          Option(resRight) // Ignore the non-indexed left expression
        } else if (isRightLiteralTrue) {
          Option(resLeft) // Ignore the non-indexed right expression
        } else {
          Option(And(resLeft, resRight))
        }

      //
      // Pushing Logical NOT inside the AND/OR expressions
      // NOTE: This is required to make sure we're properly handling negations in
      //       cases like {@code NOT(colA = 0)}, {@code NOT(colA in (a, b, ...)}
      //

      case Not(And(left: Expression, right: Expression)) =>
        Option(createColumnStatsIndexFilterExprInternal(Or(Not(left), Not(right)), isExpressionIndex = isExpressionIndex,
          indexedCols = indexedCols))

      case Not(Or(left: Expression, right: Expression)) =>
        Option(createColumnStatsIndexFilterExprInternal(And(Not(left), Not(right)), isExpressionIndex = isExpressionIndex,
          indexedCols = indexedCols))

      case _: Expression => None
    }
  }