private static SerializableFunction createInternalConverter()

in seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java [103:254]


    private static SerializableFunction<Object, BsonValue> createInternalConverter(
            SeaTunnelDataType<?> type) {
        switch (type.getSqlType()) {
            case NULL:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return BsonNull.VALUE;
                    }
                };
            case BOOLEAN:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return new BsonBoolean((boolean) value);
                    }
                };
            case TINYINT:
            case SMALLINT:
            case INT:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        int intValue =
                                value instanceof Byte
                                        ? ((Byte) value) & 0xFF
                                        : value instanceof Short
                                                ? ((Short) value).intValue()
                                                : (int) value;
                        return new BsonInt32(intValue);
                    }
                };
            case BIGINT:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return new BsonInt64((long) value);
                    }
                };
            case FLOAT:
            case DOUBLE:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        double v =
                                value instanceof Float
                                        ? ((Float) value).doubleValue()
                                        : (double) value;
                        return new BsonDouble(v);
                    }
                };
            case STRING:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        String val = value.toString();
                        // try to parse out the mongodb specific data type from extend-json.
                        if (val.startsWith("{")
                                && val.endsWith("}")
                                && val.contains(ENCODE_VALUE_FIELD)) {
                            try {
                                BsonDocument doc = BsonDocument.parse(val);
                                if (doc.containsKey(ENCODE_VALUE_FIELD)) {
                                    return doc.get(ENCODE_VALUE_FIELD);
                                }
                            } catch (JsonParseException e) {
                                // invalid json format, fallback to store as a bson string.
                                return new BsonString(value.toString());
                            }
                        }
                        return new BsonString(value.toString());
                    }
                };
            case BYTES:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        return new BsonBinary((byte[]) value);
                    }
                };
            case DATE:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        LocalDate localDate = (LocalDate) value;
                        return new BsonDateTime(
                                localDate
                                        .atStartOfDay(ZoneId.systemDefault())
                                        .toInstant()
                                        .toEpochMilli());
                    }
                };
            case TIMESTAMP:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        LocalDateTime localDateTime = (LocalDateTime) value;
                        return new BsonDateTime(
                                localDateTime
                                        .atZone(ZoneId.systemDefault())
                                        .toInstant()
                                        .toEpochMilli());
                    }
                };
            case DECIMAL:
                return new SerializableFunction<Object, BsonValue>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public BsonValue apply(Object value) {
                        DecimalType decimalType = (DecimalType) type;
                        BigDecimal decimalVal = (BigDecimal) value;
                        return new BsonDecimal128(
                                new Decimal128(
                                        Objects.requireNonNull(
                                                fromBigDecimal(
                                                        decimalVal,
                                                        decimalType.getPrecision(),
                                                        decimalType.getScale()))));
                    }
                };
            case ARRAY:
                return createArrayConverter((ArrayType<?, ?>) type);
            case MAP:
                MapType<?, ?> mapType = (MapType<?, ?>) type;
                return createMapConverter(
                        mapType.toString(), mapType.getKeyType(), mapType.getValueType());
            case ROW:
                return createRowConverter((SeaTunnelRowType) type);
            default:
                throw new MongodbConnectorException(
                        UNSUPPORTED_DATA_TYPE, "Not support to parse type: " + type);
        }
    }