in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitor.java [183:228]
public BsonValue visit(ValueLiteralExpression litExp) {
LogicalType type = litExp.getOutputDataType().getLogicalType();
Optional<BsonValue> value;
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
value = litExp.getValueAs(String.class).map(BsonString::new);
break;
case BOOLEAN:
value = litExp.getValueAs(Boolean.class).map(BsonBoolean::new);
break;
case DECIMAL:
value =
litExp.getValueAs(BigDecimal.class)
.map(Decimal128::new)
.map(BsonDecimal128::new);
break;
case INTEGER:
value = litExp.getValueAs(Integer.class).map(BsonInt32::new);
break;
case BIGINT:
value = litExp.getValueAs(Long.class).map(BsonInt64::new);
break;
case DOUBLE:
value = litExp.getValueAs(Double.class).map(BsonDouble::new);
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
value =
litExp.getValueAs(LocalDateTime.class)
.map(Timestamp::valueOf)
.map(Timestamp::getTime)
.map(BsonDateTime::new);
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
value =
litExp.getValueAs(Instant.class)
.map(Instant::toEpochMilli)
.map(BsonDateTime::new);
break;
default:
// Use BsonUndefined to represent unsupported values.
value = Optional.of(new BsonUndefined());
break;
}
return value.orElse(BsonNull.VALUE);
}