in spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala [895:1034]
private def createNativeFilter(predicate: sources.Filter): Option[ExprOuterClass.Expr] = {
def nameUnaryExpr(name: String)(
f: (ExprOuterClass.Expr.Builder, ExprOuterClass.UnaryExpr) => ExprOuterClass.Expr.Builder)
: Option[ExprOuterClass.Expr] = {
createNameExpr(name, dataSchema).map { case (_, childExpr) =>
createUnaryExpr(childExpr, f)
}
}
def nameValueBinaryExpr(name: String, value: Any)(
f: (
ExprOuterClass.Expr.Builder,
ExprOuterClass.BinaryExpr) => ExprOuterClass.Expr.Builder)
: Option[ExprOuterClass.Expr] = {
createNameExpr(name, dataSchema).flatMap { case (dataType, childExpr) =>
createValueExpr(value, dataType).map(createBinaryExpr(childExpr, _, f))
}
}
predicate match {
case sources.IsNull(name) if canMakeFilterOn(name, null) =>
nameUnaryExpr(name) { (builder, unaryExpr) =>
builder.setIsNull(unaryExpr)
}
case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
nameUnaryExpr(name) { (builder, unaryExpr) =>
builder.setIsNotNull(unaryExpr)
}
case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
nameValueBinaryExpr(name, value) { (builder, binaryExpr) =>
builder.setEq(binaryExpr)
}
case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) =>
nameValueBinaryExpr(name, value) { (builder, binaryExpr) =>
builder.setNeq(binaryExpr)
}
case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) =>
nameValueBinaryExpr(name, value) { (builder, binaryExpr) =>
builder.setEqNullSafe(binaryExpr)
}
case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) =>
nameValueBinaryExpr(name, value) { (builder, binaryExpr) =>
builder.setNeqNullSafe(binaryExpr)
}
case sources.LessThan(name, value) if (value != null) && canMakeFilterOn(name, value) =>
nameValueBinaryExpr(name, value) { (builder, binaryExpr) =>
builder.setLt(binaryExpr)
}
case sources.LessThanOrEqual(name, value)
if (value != null) && canMakeFilterOn(name, value) =>
nameValueBinaryExpr(name, value) { (builder, binaryExpr) =>
builder.setLtEq(binaryExpr)
}
case sources.GreaterThan(name, value) if (value != null) && canMakeFilterOn(name, value) =>
nameValueBinaryExpr(name, value) { (builder, binaryExpr) =>
builder.setGt(binaryExpr)
}
case sources.GreaterThanOrEqual(name, value)
if (value != null) && canMakeFilterOn(name, value) =>
nameValueBinaryExpr(name, value) { (builder, binaryExpr) =>
builder.setGtEq(binaryExpr)
}
case sources.And(lhs, rhs) =>
(createNativeFilter(lhs), createNativeFilter(rhs)) match {
case (Some(leftExpr), Some(rightExpr)) =>
Some(
createBinaryExpr(
leftExpr,
rightExpr,
(builder, binaryExpr) => builder.setAnd(binaryExpr)))
case _ => None
}
case sources.Or(lhs, rhs) =>
(createNativeFilter(lhs), createNativeFilter(rhs)) match {
case (Some(leftExpr), Some(rightExpr)) =>
Some(
createBinaryExpr(
leftExpr,
rightExpr,
(builder, binaryExpr) => builder.setOr(binaryExpr)))
case _ => None
}
case sources.Not(pred) =>
val childExpr = createNativeFilter(pred)
childExpr.map { expr =>
createUnaryExpr(expr, (builder, unaryExpr) => builder.setNot(unaryExpr))
}
case sources.In(name, values)
if pushDownInFilterThreshold > 0 && values.nonEmpty &&
canMakeFilterOn(name, values.head) =>
createNameExpr(name, dataSchema).flatMap { case (dataType, nameExpr) =>
val valueExprs = values.flatMap(createValueExpr(_, dataType))
if (valueExprs.length != values.length) {
None
} else {
val builder = ExprOuterClass.In.newBuilder()
builder.setInValue(nameExpr)
builder.addAllLists(valueExprs.toSeq.asJava)
builder.setNegated(false)
Some(
ExprOuterClass.Expr
.newBuilder()
.setIn(builder)
.build())
}
}
case sources.StringStartsWith(name, prefix)
if pushDownStringPredicate && canMakeFilterOn(name, prefix) =>
nameValueBinaryExpr(name, prefix) { (builder, binaryExpr) =>
builder.setStartsWith(binaryExpr)
}
case sources.StringEndsWith(name, suffix)
if pushDownStringPredicate && canMakeFilterOn(name, suffix) =>
nameValueBinaryExpr(name, suffix) { (builder, binaryExpr) =>
builder.setEndsWith(binaryExpr)
}
case sources.StringContains(name, value)
if pushDownStringPredicate && canMakeFilterOn(name, value) =>
nameValueBinaryExpr(name, value) { (builder, binaryExpr) =>
builder.setContains(binaryExpr)
}
case _ => None
}
}