private static SerializableFunction createInternalConverter()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java [126:250]


    private static SerializableFunction<Object, BsonValue> createInternalConverter(
            LogicalType type) {
        switch (type.getTypeRoot()) {
            case NULL:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return BsonNull.VALUE;
                    }
                };
            case BOOLEAN:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return new BsonBoolean((boolean) value);
                    }
                };
            case INTEGER:
            case INTERVAL_YEAR_MONTH:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return new BsonInt32((int) value);
                    }
                };
            case BIGINT:
            case INTERVAL_DAY_TIME:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return new BsonInt64((long) value);
                    }
                };
            case DOUBLE:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return new BsonDouble((double) value);
                    }
                };
            case DECIMAL:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        BigDecimal decimalVal = ((DecimalData) value).toBigDecimal();
                        return new BsonDecimal128(new Decimal128(decimalVal));
                    }
                };
            case CHAR:
            case VARCHAR:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        String val = value.toString();
                        // try to parse out the mongodb specific data type from extend-json.
                        if (val.startsWith("{")
                                && val.endsWith("}")
                                && val.contains(ENCODE_VALUE_FIELD)) {
                            try {
                                BsonDocument doc = BsonDocument.parse(val);
                                if (doc.containsKey(ENCODE_VALUE_FIELD)) {
                                    return doc.get(ENCODE_VALUE_FIELD);
                                }
                            } catch (JsonParseException e) {
                                // invalid json format, fallback to store as a bson string.
                                return new BsonString(value.toString());
                            }
                        }
                        return new BsonString(value.toString());
                    }
                };
            case BINARY:
            case VARBINARY:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return new BsonBinary((byte[]) value);
                    }
                };
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return new BsonDateTime(((TimestampData) value).toTimestamp().getTime());
                    }
                };
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return new BsonDateTime(((TimestampData) value).getMillisecond());
                    }
                };
            case ROW:
                return createRowConverter((RowType) type);
            case ARRAY:
                return createArrayConverter((ArrayType) type);
            case MAP:
                return createMapConverter((MapType) type);
            case MULTISET:
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type:" + type);
        }
    }