private static SerializableFunction createInternalConverter()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java [129:240]


    private static SerializableFunction<BsonValue, Object> createInternalConverter(
            LogicalType type) {
        switch (type.getTypeRoot()) {
            case NULL:
                return new SerializableFunction<BsonValue, Object>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return null;
                    }
                };
            case BOOLEAN:
                return new SerializableFunction<BsonValue, Object>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return convertToBoolean(bsonValue);
                    }
                };
            case INTEGER:
            case INTERVAL_YEAR_MONTH:
                return new SerializableFunction<BsonValue, Object>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return convertToInt(bsonValue);
                    }
                };
            case BIGINT:
            case INTERVAL_DAY_TIME:
                return new SerializableFunction<BsonValue, Object>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return convertToLong(bsonValue);
                    }
                };
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return new SerializableFunction<BsonValue, Object>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return TimestampData.fromLocalDateTime(convertToLocalDateTime(bsonValue));
                    }
                };
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return new SerializableFunction<BsonValue, Object>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return TimestampData.fromInstant(convertToInstant(bsonValue));
                    }
                };
            case DOUBLE:
                return new SerializableFunction<BsonValue, Object>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return convertToDouble(bsonValue);
                    }
                };
            case CHAR:
            case VARCHAR:
                return new SerializableFunction<BsonValue, Object>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return StringData.fromString(convertToString(bsonValue));
                    }
                };
            case BINARY:
            case VARBINARY:
                return new SerializableFunction<BsonValue, Object>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        return convertToBinary(bsonValue);
                    }
                };
            case DECIMAL:
                return new SerializableFunction<BsonValue, Object>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object apply(BsonValue bsonValue) {
                        DecimalType decimalType = (DecimalType) type;
                        BigDecimal decimalValue = convertToBigDecimal(bsonValue);
                        return DecimalData.fromBigDecimal(
                                decimalValue, decimalType.getPrecision(), decimalType.getScale());
                    }
                };
            case ROW:
                return createRowConverter((RowType) type);
            case ARRAY:
                return createArrayConverter((ArrayType) type);
            case MAP:
                return createMapConverter((MapType) type);
            case MULTISET:
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type: " + type);
        }
    }