in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitor.java [104:140]
private Bson renderBinaryComparisonOperator(
String operator, List<ResolvedExpression> expressions) {
Optional<FieldReferenceExpression> fieldReferenceExpr =
extractExpression(expressions, FieldReferenceExpression.class);
Optional<ValueLiteralExpression> fieldValueExpr =
extractExpression(expressions, ValueLiteralExpression.class);
// Nested complex expressions are not supported. e.g. f1 = (f2 > 2)
if (!fieldReferenceExpr.isPresent() || !fieldValueExpr.isPresent()) {
return Filters.empty();
}
String fieldName = visit(fieldReferenceExpr.get()).getValue();
BsonValue fieldValue = visit(fieldValueExpr.get());
// Unsupported values
if (fieldValue.getBsonType() == BsonType.UNDEFINED) {
return Filters.empty();
}
switch (operator) {
case "$eq":
return Filters.eq(fieldName, fieldValue);
case "$lt":
return Filters.lt(fieldName, fieldValue);
case "$lte":
return Filters.lte(fieldName, fieldValue);
case "$gt":
return Filters.gt(fieldName, fieldValue);
case "$gte":
return Filters.gte(fieldName, fieldValue);
case "$ne":
return Filters.ne(fieldName, fieldValue);
default:
return Filters.empty();
}
}