in sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala [645:862]
def convertFilters(table: Table, filters: Seq[Expression]): String = {
lazy val dateFormatter = DateFormatter()
/**
* An extractor that matches all binary comparison operators except null-safe equality.
*
* Null-safe equality is not supported by Hive metastore partition predicate pushdown
*/
object SpecialBinaryComparison {
def unapply(e: BinaryComparison): Option[(Expression, Expression)] = e match {
case _: EqualNullSafe => None
case _ => Some((e.left, e.right))
}
}
object ExtractableLiteral {
def unapply(expr: Expression): Option[String] = expr match {
case Literal(null, _) => None // `null`s can be cast as other types; we want to avoid NPEs.
case Literal(value, _: IntegralType) => Some(value.toString)
case Literal(value, _: StringType) => Some(quoteStringLiteral(value.toString))
case Literal(value, _: DateType) =>
Some(quoteStringLiteral(dateFormatter.format(value.asInstanceOf[Int])))
case _ => None
}
}
object ExtractableLiterals {
def unapply(exprs: Seq[Expression]): Option[Seq[String]] = {
// SPARK-24879: The Hive metastore filter parser does not support "null", but we still want
// to push down as many predicates as we can while still maintaining correctness.
// In SQL, the `IN` expression evaluates as follows:
// > `1 in (2, NULL)` -> NULL
// > `1 in (1, NULL)` -> true
// > `1 in (2)` -> false
// Since Hive metastore filters are NULL-intolerant binary operations joined only by
// `AND` and `OR`, we can treat `NULL` as `false` and thus rewrite `1 in (2, NULL)` as
// `1 in (2)`.
// If the Hive metastore begins supporting NULL-tolerant predicates and Spark starts
// pushing down these predicates, then this optimization will become incorrect and need
// to be changed.
val extractables = exprs
.filter {
case Literal(null, _) => false
case _ => true
}.map(ExtractableLiteral.unapply)
if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
Some(extractables.map(_.get))
} else {
None
}
}
}
object ExtractableValues {
private lazy val valueToLiteralString: PartialFunction[Any, String] = {
case value: Byte => value.toString
case value: Short => value.toString
case value: Int => value.toString
case value: Long => value.toString
case value: UTF8String => quoteStringLiteral(value.toString)
}
def unapply(values: Set[Any]): Option[Seq[String]] = {
val extractables = values.filter(_ != null).toSeq.map(valueToLiteralString.lift)
if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
Some(extractables.map(_.get))
} else {
None
}
}
}
object ExtractableDateValues {
private lazy val valueToLiteralString: PartialFunction[Any, String] = {
case value: Int => quoteStringLiteral(dateFormatter.format(value))
}
def unapply(values: Set[Any]): Option[Seq[String]] = {
val extractables = values.filter(_ != null).toSeq.map(valueToLiteralString.lift)
if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
Some(extractables.map(_.get))
} else {
None
}
}
}
object SupportedAttribute {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
private val varcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
.map(col => col.getName).toSet
def unapply(attr: Attribute): Option[String] = {
val resolver = SQLConf.get.resolver
if (varcharKeys.exists(c => resolver(c, attr.name))) {
None
} else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType ||
attr.dataType == DateType) {
Some(attr.name)
} else {
None
}
}
}
def convertInToOr(name: String, values: Seq[String]): String = {
values.map(value => s"$name = $value").mkString("(", " or ", ")")
}
def convertNotInToAnd(name: String, values: Seq[String]): String = {
values.map(value => s"$name != $value").mkString("(", " and ", ")")
}
def hasNullLiteral(list: Seq[Expression]): Boolean = list.exists {
case Literal(null, _) => true
case _ => false
}
val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled
val inSetThreshold = SQLConf.get.metastorePartitionPruningInSetThreshold
object ExtractAttribute {
@scala.annotation.tailrec
def unapply(expr: Expression): Option[Attribute] = {
expr match {
case attr: Attribute => Some(attr)
case Cast(child @ IntegralTypeExpression(), dt: IntegralType, _, _)
if Cast.canUpCast(child.dataType.asInstanceOf[AtomicType], dt) => unapply(child)
case _ => None
}
}
}
def convert(expr: Expression): Option[String] = expr match {
case Not(InSet(_, values)) if values.size > inSetThreshold =>
None
case Not(In(_, list)) if hasNullLiteral(list) => None
case Not(InSet(_, list)) if list.contains(null) => None
case In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values))
if useAdvanced =>
Some(convertInToOr(name, values))
case Not(In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values)))
if useAdvanced =>
Some(convertNotInToAnd(name, values))
case InSet(child, values) if useAdvanced && values.size > inSetThreshold =>
val dataType = child.dataType
// Skip null here is safe, more details could see at ExtractableLiterals.
val sortedValues = values.filter(_ != null).toSeq
.sorted(TypeUtils.getInterpretedOrdering(dataType))
convert(And(GreaterThanOrEqual(child, Literal(sortedValues.head, dataType)),
LessThanOrEqual(child, Literal(sortedValues.last, dataType))))
case InSet(child @ ExtractAttribute(SupportedAttribute(name)), ExtractableDateValues(values))
if useAdvanced && child.dataType == DateType =>
Some(convertInToOr(name, values))
case Not(InSet(child @ ExtractAttribute(SupportedAttribute(name)),
ExtractableDateValues(values))) if useAdvanced && child.dataType == DateType =>
Some(convertNotInToAnd(name, values))
case InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values))
if useAdvanced =>
Some(convertInToOr(name, values))
case Not(InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values)))
if useAdvanced =>
Some(convertNotInToAnd(name, values))
case op @ SpecialBinaryComparison(
ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name ${op.symbol} $value")
case op @ SpecialBinaryComparison(
ExtractableLiteral(value), ExtractAttribute(SupportedAttribute(name))) =>
Some(s"$value ${op.symbol} $name")
case Contains(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name like " + (("\"" + wildcard + value.drop(1)).dropRight(1) + wildcard + "\""))
case StartsWith(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name like " + (value.dropRight(1) + wildcard + "\""))
case EndsWith(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name like " + ("\"" + wildcard + value.drop(1)))
case And(expr1, expr2) if useAdvanced =>
val converted = convert(expr1) ++ convert(expr2)
if (converted.isEmpty) {
None
} else {
Some(converted.mkString("(", " and ", ")"))
}
case Or(expr1, expr2) if useAdvanced =>
for {
left <- convert(expr1)
right <- convert(expr2)
} yield s"($left or $right)"
case Not(EqualTo(
ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value))) if useAdvanced =>
Some(s"$name != $value")
case Not(EqualTo(
ExtractableLiteral(value), ExtractAttribute(SupportedAttribute(name)))) if useAdvanced =>
Some(s"$value != $name")
case _ => None
}
filters.flatMap(convert).mkString(" and ")
}