in paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala [49:159]
private def convert(sparkPredicate: SparkPredicate): Predicate = {
sparkPredicate.name() match {
case EQUAL_TO =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
// TODO deal with isNaN
val index = fieldIndex(fieldName)
builder.equal(index, convertLiteral(index, literal))
}
case EQUAL_NULL_SAFE =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
if (literal == null) {
builder.isNull(index)
} else {
builder.equal(index, convertLiteral(index, literal))
}
}
case GREATER_THAN =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.greaterThan(index, convertLiteral(index, literal))
}
case GREATER_THAN_OR_EQUAL =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.greaterOrEqual(index, convertLiteral(index, literal))
}
case LESS_THAN =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.lessThan(index, convertLiteral(index, literal))
}
case LESS_THAN_OR_EQUAL =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.lessOrEqual(index, convertLiteral(index, literal))
}
case IN =>
MultiPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literals)) =>
val index = fieldIndex(fieldName)
literals.map(convertLiteral(index, _)).toList.asJava
builder.in(index, literals.map(convertLiteral(index, _)).toList.asJava)
}
case IS_NULL =>
UnaryPredicate.unapply(sparkPredicate) match {
case Some(fieldName) =>
builder.isNull(fieldIndex(fieldName))
}
case IS_NOT_NULL =>
UnaryPredicate.unapply(sparkPredicate) match {
case Some(fieldName) =>
builder.isNotNull(fieldIndex(fieldName))
}
case AND =>
val and = sparkPredicate.asInstanceOf[And]
PredicateBuilder.and(convert(and.left), convert(and.right()))
case OR =>
val or = sparkPredicate.asInstanceOf[Or]
PredicateBuilder.or(convert(or.left), convert(or.right()))
case NOT =>
val not = sparkPredicate.asInstanceOf[Not]
val negate = convert(not.child()).negate()
if (negate.isPresent) {
negate.get()
} else {
throw new UnsupportedOperationException(s"Convert $sparkPredicate is unsupported.")
}
case STRING_START_WITH =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.startsWith(index, convertLiteral(index, literal))
}
case STRING_END_WITH =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.endsWith(index, convertLiteral(index, literal))
}
case STRING_CONTAINS =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.contains(index, convertLiteral(index, literal))
}
// TODO: AlwaysTrue, AlwaysFalse
case _ => throw new UnsupportedOperationException(s"Convert $sparkPredicate is unsupported.")
}
}