in spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala [680:884]
private def createFilterHelper(
predicate: sources.Filter,
canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = {
// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
// which can be casted to `false` implicitly. Please refer to the `eval` method of these
// operators and the `PruneFilters` rule for details.
// Hyukjin:
// I added [[EqualNullSafe]] with [[org.apache.parquet.filter2.predicate.Operators.Eq]].
// So, it performs equality comparison identically when given [[sources.Filter]] is [[EqualTo]].
// The reason why I did this is, that the actual Parquet filter checks null-safe equality
// comparison.
// So I added this and maybe [[EqualTo]] should be changed. It still seems fine though, because
// physical planning does not set `NULL` to [[EqualTo]] but changes it to [[IsNull]] and etc.
// Probably I missed something and obviously this should be changed.
predicate match {
case sources.IsNull(name) if canMakeFilterOn(name, null) =>
makeEq
.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, null))
case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
makeNotEq
.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, null))
case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
makeEq
.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, value))
case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) =>
makeNotEq
.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, value))
case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) =>
makeEq
.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, value))
case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) =>
makeNotEq
.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, value))
case sources.LessThan(name, value) if (value != null) && canMakeFilterOn(name, value) =>
makeLt
.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, value))
case sources.LessThanOrEqual(name, value)
if (value != null) && canMakeFilterOn(name, value) =>
makeLtEq
.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, value))
case sources.GreaterThan(name, value) if (value != null) && canMakeFilterOn(name, value) =>
makeGt
.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, value))
case sources.GreaterThanOrEqual(name, value)
if (value != null) && canMakeFilterOn(name, value) =>
makeGtEq
.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, value))
case sources.And(lhs, rhs) =>
// At here, it is not safe to just convert one side and remove the other side
// if we do not understand what the parent filters are.
//
// Here is an example used to explain the reason.
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
// NOT(a = 2), which will generate wrong results.
//
// Pushing one side of AND down is only safe to do at the top level or in the child
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
// can be safely removed.
val lhsFilterOption =
createFilterHelper(lhs, canPartialPushDownConjuncts)
val rhsFilterOption =
createFilterHelper(rhs, canPartialPushDownConjuncts)
(lhsFilterOption, rhsFilterOption) match {
case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter))
case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter)
case (None, Some(rhsFilter)) if canPartialPushDownConjuncts => Some(rhsFilter)
case _ => None
}
case sources.Or(lhs, rhs) =>
// The Or predicate is convertible when both of its children can be pushed down.
// That is to say, if one/both of the children can be partially pushed down, the Or
// predicate can be partially pushed down as well.
//
// Here is an example used to explain the reason.
// Let's say we have
// (a1 AND a2) OR (b1 AND b2),
// a1 and b1 is convertible, while a2 and b2 is not.
// The predicate can be converted as
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
// As per the logical in And predicate, we can push down (a1 OR b1).
for {
lhsFilter <- createFilterHelper(lhs, canPartialPushDownConjuncts)
rhsFilter <- createFilterHelper(rhs, canPartialPushDownConjuncts)
} yield FilterApi.or(lhsFilter, rhsFilter)
case sources.Not(pred) =>
createFilterHelper(pred, canPartialPushDownConjuncts = false)
.map(FilterApi.not)
case sources.In(name, values)
if pushDownInFilterThreshold > 0 && values.nonEmpty &&
canMakeFilterOn(name, values.head) =>
val fieldType = nameToParquetField(name).fieldType
val fieldNames = nameToParquetField(name).fieldNames
if (values.length <= pushDownInFilterThreshold) {
values.distinct
.flatMap { v =>
makeEq.lift(fieldType).map(_(fieldNames, v))
}
.reduceLeftOption(FilterApi.or)
} else if (canPartialPushDownConjuncts) {
val primitiveType = schema.getColumnDescription(fieldNames).getPrimitiveType
val statistics: ParquetStatistics[_] = ParquetStatistics.createStats(primitiveType)
if (values.contains(null)) {
Seq(
makeEq.lift(fieldType).map(_(fieldNames, null)),
makeInPredicate
.lift(fieldType)
.map(_(fieldNames, values.filter(_ != null), statistics))).flatten
.reduceLeftOption(FilterApi.or)
} else {
makeInPredicate.lift(fieldType).map(_(fieldNames, values, statistics))
}
} else {
None
}
case sources.StringStartsWith(name, prefix)
if pushDownStringPredicate && canMakeFilterOn(name, prefix) =>
Option(prefix).map { v =>
FilterApi.userDefined(
binaryColumn(nameToParquetField(name).fieldNames),
new UserDefinedPredicate[Binary] with Serializable {
private val strToBinary = Binary.fromReusedByteArray(v.getBytes)
private val size = strToBinary.length
override def canDrop(statistics: Statistics[Binary]): Boolean = {
val comparator = PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
val max = statistics.getMax
val min = statistics.getMin
comparator.compare(max.slice(0, math.min(size, max.length)), strToBinary) < 0 ||
comparator.compare(min.slice(0, math.min(size, min.length)), strToBinary) > 0
}
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = {
val comparator = PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
val max = statistics.getMax
val min = statistics.getMin
comparator.compare(max.slice(0, math.min(size, max.length)), strToBinary) == 0 &&
comparator.compare(min.slice(0, math.min(size, min.length)), strToBinary) == 0
}
override def keep(value: Binary): Boolean = {
value != null && UTF8String
.fromBytes(value.getBytes)
.startsWith(UTF8String.fromBytes(strToBinary.getBytes))
}
})
}
case sources.StringEndsWith(name, suffix)
if pushDownStringPredicate && canMakeFilterOn(name, suffix) =>
Option(suffix).map { v =>
FilterApi.userDefined(
binaryColumn(nameToParquetField(name).fieldNames),
new UserDefinedPredicate[Binary] with Serializable {
private val suffixStr = UTF8String.fromString(v)
override def canDrop(statistics: Statistics[Binary]): Boolean = false
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false
override def keep(value: Binary): Boolean = {
value != null && UTF8String.fromBytes(value.getBytes).endsWith(suffixStr)
}
})
}
case sources.StringContains(name, value)
if pushDownStringPredicate && canMakeFilterOn(name, value) =>
Option(value).map { v =>
FilterApi.userDefined(
binaryColumn(nameToParquetField(name).fieldNames),
new UserDefinedPredicate[Binary] with Serializable {
private val subStr = UTF8String.fromString(v)
override def canDrop(statistics: Statistics[Binary]): Boolean = false
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false
override def keep(value: Binary): Boolean = {
value != null && UTF8String.fromBytes(value.getBytes).contains(subStr)
}
})
}
case _ => None
}
}