def convertFilters()

in sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala [645:862]


  def convertFilters(table: Table, filters: Seq[Expression]): String = {
    lazy val dateFormatter = DateFormatter()

    /**
     * An extractor that matches all binary comparison operators except null-safe equality.
     *
     * Null-safe equality is not supported by Hive metastore partition predicate pushdown
     */
    object SpecialBinaryComparison {
      def unapply(e: BinaryComparison): Option[(Expression, Expression)] = e match {
        case _: EqualNullSafe => None
        case _ => Some((e.left, e.right))
      }
    }

    object ExtractableLiteral {
      def unapply(expr: Expression): Option[String] = expr match {
        case Literal(null, _) => None // `null`s can be cast as other types; we want to avoid NPEs.
        case Literal(value, _: IntegralType) => Some(value.toString)
        case Literal(value, _: StringType) => Some(quoteStringLiteral(value.toString))
        case Literal(value, _: DateType) =>
          Some(quoteStringLiteral(dateFormatter.format(value.asInstanceOf[Int])))
        case _ => None
      }
    }

    object ExtractableLiterals {
      def unapply(exprs: Seq[Expression]): Option[Seq[String]] = {
        // SPARK-24879: The Hive metastore filter parser does not support "null", but we still want
        // to push down as many predicates as we can while still maintaining correctness.
        // In SQL, the `IN` expression evaluates as follows:
        //  > `1 in (2, NULL)` -> NULL
        //  > `1 in (1, NULL)` -> true
        //  > `1 in (2)` -> false
        // Since Hive metastore filters are NULL-intolerant binary operations joined only by
        // `AND` and `OR`, we can treat `NULL` as `false` and thus rewrite `1 in (2, NULL)` as
        // `1 in (2)`.
        // If the Hive metastore begins supporting NULL-tolerant predicates and Spark starts
        // pushing down these predicates, then this optimization will become incorrect and need
        // to be changed.
        val extractables = exprs
          .filter {
            case Literal(null, _) => false
            case _ => true
          }.map(ExtractableLiteral.unapply)
        if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
          Some(extractables.map(_.get))
        } else {
          None
        }
      }
    }

    object ExtractableValues {
      private lazy val valueToLiteralString: PartialFunction[Any, String] = {
        case value: Byte => value.toString
        case value: Short => value.toString
        case value: Int => value.toString
        case value: Long => value.toString
        case value: UTF8String => quoteStringLiteral(value.toString)
      }

      def unapply(values: Set[Any]): Option[Seq[String]] = {
        val extractables = values.filter(_ != null).toSeq.map(valueToLiteralString.lift)
        if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
          Some(extractables.map(_.get))
        } else {
          None
        }
      }
    }

    object ExtractableDateValues {
      private lazy val valueToLiteralString: PartialFunction[Any, String] = {
        case value: Int => quoteStringLiteral(dateFormatter.format(value))
      }

      def unapply(values: Set[Any]): Option[Seq[String]] = {
        val extractables = values.filter(_ != null).toSeq.map(valueToLiteralString.lift)
        if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
          Some(extractables.map(_.get))
        } else {
          None
        }
      }
    }

    object SupportedAttribute {
      // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
      private val varcharKeys = table.getPartitionKeys.asScala
        .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
          col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
        .map(col => col.getName).toSet

      def unapply(attr: Attribute): Option[String] = {
        val resolver = SQLConf.get.resolver
        if (varcharKeys.exists(c => resolver(c, attr.name))) {
          None
        } else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType ||
            attr.dataType == DateType) {
          Some(attr.name)
        } else {
          None
        }
      }
    }

    def convertInToOr(name: String, values: Seq[String]): String = {
      values.map(value => s"$name = $value").mkString("(", " or ", ")")
    }

    def convertNotInToAnd(name: String, values: Seq[String]): String = {
      values.map(value => s"$name != $value").mkString("(", " and ", ")")
    }

    def hasNullLiteral(list: Seq[Expression]): Boolean = list.exists {
      case Literal(null, _) => true
      case _ => false
    }

    val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled
    val inSetThreshold = SQLConf.get.metastorePartitionPruningInSetThreshold

    object ExtractAttribute {
      @scala.annotation.tailrec
      def unapply(expr: Expression): Option[Attribute] = {
        expr match {
          case attr: Attribute => Some(attr)
          case Cast(child @ IntegralTypeExpression(), dt: IntegralType, _, _)
              if Cast.canUpCast(child.dataType.asInstanceOf[AtomicType], dt) => unapply(child)
          case _ => None
        }
      }
    }

    def convert(expr: Expression): Option[String] = expr match {
      case Not(InSet(_, values)) if values.size > inSetThreshold =>
        None

      case Not(In(_, list)) if hasNullLiteral(list) => None
      case Not(InSet(_, list)) if list.contains(null) => None

      case In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values))
          if useAdvanced =>
        Some(convertInToOr(name, values))

      case Not(In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values)))
          if useAdvanced =>
        Some(convertNotInToAnd(name, values))

      case InSet(child, values) if useAdvanced && values.size > inSetThreshold =>
        val dataType = child.dataType
        // Skip null here is safe, more details could see at ExtractableLiterals.
        val sortedValues = values.filter(_ != null).toSeq
          .sorted(TypeUtils.getInterpretedOrdering(dataType))
        convert(And(GreaterThanOrEqual(child, Literal(sortedValues.head, dataType)),
          LessThanOrEqual(child, Literal(sortedValues.last, dataType))))

      case InSet(child @ ExtractAttribute(SupportedAttribute(name)), ExtractableDateValues(values))
          if useAdvanced && child.dataType == DateType =>
        Some(convertInToOr(name, values))

      case Not(InSet(child @ ExtractAttribute(SupportedAttribute(name)),
        ExtractableDateValues(values))) if useAdvanced && child.dataType == DateType =>
        Some(convertNotInToAnd(name, values))

      case InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values))
          if useAdvanced =>
        Some(convertInToOr(name, values))

      case Not(InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values)))
          if useAdvanced =>
        Some(convertNotInToAnd(name, values))

      case op @ SpecialBinaryComparison(
          ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
        Some(s"$name ${op.symbol} $value")

      case op @ SpecialBinaryComparison(
          ExtractableLiteral(value), ExtractAttribute(SupportedAttribute(name))) =>
        Some(s"$value ${op.symbol} $name")

      case Contains(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
        Some(s"$name like " + (("\"" + wildcard + value.drop(1)).dropRight(1) + wildcard + "\""))

      case StartsWith(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
        Some(s"$name like " + (value.dropRight(1) + wildcard + "\""))

      case EndsWith(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
        Some(s"$name like " + ("\"" + wildcard + value.drop(1)))

      case And(expr1, expr2) if useAdvanced =>
        val converted = convert(expr1) ++ convert(expr2)
        if (converted.isEmpty) {
          None
        } else {
          Some(converted.mkString("(", " and ", ")"))
        }

      case Or(expr1, expr2) if useAdvanced =>
        for {
          left <- convert(expr1)
          right <- convert(expr2)
        } yield s"($left or $right)"

      case Not(EqualTo(
          ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value))) if useAdvanced =>
        Some(s"$name != $value")

      case Not(EqualTo(
          ExtractableLiteral(value), ExtractAttribute(SupportedAttribute(name)))) if useAdvanced =>
        Some(s"$value != $name")

      case _ => None
    }

    filters.flatMap(convert).mkString(" and ")
  }