in spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java [127:310]
public static Expression convert(Predicate predicate) {
Operation op = FILTERS.get(predicate.name());
if (op != null) {
switch (op) {
case TRUE:
return Expressions.alwaysTrue();
case FALSE:
return Expressions.alwaysFalse();
case IS_NULL:
if (canConvertToTerm(child(predicate))) {
UnboundTerm<Object> term = toTerm(child(predicate));
return term != null ? isNull(term) : null;
}
return null;
case NOT_NULL:
if (canConvertToTerm(child(predicate))) {
UnboundTerm<Object> term = toTerm(child(predicate));
return term != null ? notNull(term) : null;
}
return null;
case LT:
if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
UnboundTerm<Object> term = toTerm(leftChild(predicate));
return term != null ? lessThan(term, convertLiteral(rightChild(predicate))) : null;
} else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
UnboundTerm<Object> term = toTerm(rightChild(predicate));
return term != null ? greaterThan(term, convertLiteral(leftChild(predicate))) : null;
} else {
return null;
}
case LT_EQ:
if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
UnboundTerm<Object> term = toTerm(leftChild(predicate));
return term != null
? lessThanOrEqual(term, convertLiteral(rightChild(predicate)))
: null;
} else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
UnboundTerm<Object> term = toTerm(rightChild(predicate));
return term != null
? greaterThanOrEqual(term, convertLiteral(leftChild(predicate)))
: null;
} else {
return null;
}
case GT:
if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
UnboundTerm<Object> term = toTerm(leftChild(predicate));
return term != null ? greaterThan(term, convertLiteral(rightChild(predicate))) : null;
} else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
UnboundTerm<Object> term = toTerm(rightChild(predicate));
return term != null ? lessThan(term, convertLiteral(leftChild(predicate))) : null;
} else {
return null;
}
case GT_EQ:
if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
UnboundTerm<Object> term = toTerm(leftChild(predicate));
return term != null
? greaterThanOrEqual(term, convertLiteral(rightChild(predicate)))
: null;
} else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
UnboundTerm<Object> term = toTerm(rightChild(predicate));
return term != null
? lessThanOrEqual(term, convertLiteral(leftChild(predicate)))
: null;
} else {
return null;
}
case EQ: // used for both eq and null-safe-eq
Pair<UnboundTerm<Object>, Object> eqChildren = predicateChildren(predicate);
if (eqChildren == null) {
return null;
}
if (predicate.name().equals(EQ)) {
// comparison with null in normal equality is always null. this is probably a mistake.
Preconditions.checkNotNull(
eqChildren.second(),
"Expression is always false (eq is not null-safe): %s",
predicate);
}
return handleEqual(eqChildren.first(), eqChildren.second());
case NOT_EQ:
Pair<UnboundTerm<Object>, Object> notEqChildren = predicateChildren(predicate);
if (notEqChildren == null) {
return null;
}
// comparison with null in normal equality is always null. this is probably a mistake.
Preconditions.checkNotNull(
notEqChildren.second(),
"Expression is always false (notEq is not null-safe): %s",
predicate);
return handleNotEqual(notEqChildren.first(), notEqChildren.second());
case IN:
if (isSupportedInPredicate(predicate)) {
UnboundTerm<Object> term = toTerm(childAtIndex(predicate, 0));
return term != null
? in(
term,
Arrays.stream(predicate.children())
.skip(1)
.map(val -> convertLiteral(((Literal<?>) val)))
.filter(Objects::nonNull)
.collect(Collectors.toList()))
: null;
} else {
return null;
}
case NOT:
Not notPredicate = (Not) predicate;
Predicate childPredicate = notPredicate.child();
if (childPredicate.name().equals(IN) && isSupportedInPredicate(childPredicate)) {
UnboundTerm<Object> term = toTerm(childAtIndex(childPredicate, 0));
if (term == null) {
return null;
}
// infer an extra notNull predicate for Spark NOT IN filters
// as Iceberg expressions don't follow the 3-value SQL boolean logic
// col NOT IN (1, 2) in Spark is equal to notNull(col) && notIn(col, 1, 2) in Iceberg
Expression notIn =
notIn(
term,
Arrays.stream(childPredicate.children())
.skip(1)
.map(val -> convertLiteral(((Literal<?>) val)))
.filter(Objects::nonNull)
.collect(Collectors.toList()));
return and(notNull(term), notIn);
} else if (hasNoInFilter(childPredicate)) {
Expression child = convert(childPredicate);
if (child != null) {
return not(child);
}
}
return null;
case AND:
{
And andPredicate = (And) predicate;
Expression left = convert(andPredicate.left());
Expression right = convert(andPredicate.right());
if (left != null && right != null) {
return and(left, right);
}
return null;
}
case OR:
{
Or orPredicate = (Or) predicate;
Expression left = convert(orPredicate.left());
Expression right = convert(orPredicate.right());
if (left != null && right != null) {
return or(left, right);
}
return null;
}
case STARTS_WITH:
String colName = SparkUtil.toColumnName(leftChild(predicate));
return startsWith(colName, convertLiteral(rightChild(predicate)).toString());
}
}
return null;
}