static FieldWriter createFieldWriter()

in hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/api/table/RowDataWriter.java [16:188]


    static <T> FieldWriter createFieldWriter(
            LogicalType fieldType,
            int hologresType,
            String hologresTypeName,
            RowDataWriter<T> rowDataWriter,
            int columnIndexInHologresTable,
            String arrayDelimiter) {
        FieldWriter fieldWriter;
        switch (hologresType) {
            case Types.CHAR:
            case Types.VARCHAR:
                fieldWriter =
                        (obj) -> {
                            rowDataWriter.writeString((StringData) obj, columnIndexInHologresTable);
                        };
                break;
            case Types.BIT:
            case Types.BOOLEAN:
                fieldWriter =
                        (obj) -> {
                            rowDataWriter.writeBoolean((Boolean) obj, columnIndexInHologresTable);
                        };
                break;
            case Types.BINARY:
            case Types.VARBINARY:
                fieldWriter =
                        (obj) -> {
                            rowDataWriter.writeBinary((byte[]) obj, columnIndexInHologresTable);
                        };
                break;
            case Types.NUMERIC:
            case Types.DECIMAL:
                int decimalPrecision = LogicalTypeChecks.getPrecision(fieldType);
                int decimalScale = LogicalTypeChecks.getScale(fieldType);
                fieldWriter =
                        (obj) -> {
                            rowDataWriter.writeDecimal(
                                    (DecimalData) obj,
                                    columnIndexInHologresTable,
                                    decimalPrecision,
                                    decimalScale);
                        };
                break;
            case Types.TINYINT:
                fieldWriter =
                        (obj) -> {
                            rowDataWriter.writeByte((Byte) obj, columnIndexInHologresTable);
                        };
                break;
            case Types.SMALLINT:
                fieldWriter =
                        (obj) -> {
                            // SMALLINT should compatible with TINYINT
                            if (obj instanceof Byte) {
                                rowDataWriter.writeShort(
                                        ((Byte) obj).shortValue(), columnIndexInHologresTable);
                            } else {
                                rowDataWriter.writeShort((short) obj, columnIndexInHologresTable);
                            }
                        };
                break;
            case Types.INTEGER:
                fieldWriter =
                        (obj) -> {
                            rowDataWriter.writeInt((Integer) obj, columnIndexInHologresTable);
                        };
                break;
            case Types.DATE:
                fieldWriter =
                        (obj) -> {
                            rowDataWriter.writeDate((Integer) obj, columnIndexInHologresTable);
                        };
                break;
            case Types.BIGINT:
                fieldWriter =
                        (obj) -> {
                            rowDataWriter.writeLong((Long) obj, columnIndexInHologresTable);
                        };
                break;
            case Types.REAL:
            case Types.FLOAT:
                fieldWriter =
                        (obj) -> {
                            rowDataWriter.writeFloat((Float) obj, columnIndexInHologresTable);
                        };
                break;
            case Types.DOUBLE:
                fieldWriter =
                        (obj) -> {
                            rowDataWriter.writeDouble((Double) obj, columnIndexInHologresTable);
                        };
                break;
            case Types.TIMESTAMP:
            case Types.TIMESTAMP_WITH_TIMEZONE:
                boolean isFlinkLTZ =
                        fieldType
                                .getTypeRoot()
                                .equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
                if (hologresTypeName.equals("timestamptz")) {
                    if (isFlinkLTZ) {
                        // flink TIMESTAMP_LTZ -> holo TIMESTAMPTZ
                        fieldWriter =
                                (obj) -> {
                                    rowDataWriter.writeLTZAsTimestampTz(
                                            (TimestampData) obj, columnIndexInHologresTable);
                                };
                    } else {
                        // flink TIMESTAMP -> holo TIMESTAMPTZ
                        fieldWriter =
                                (obj) -> {
                                    rowDataWriter.writeTimestampTz(
                                            (TimestampData) obj, columnIndexInHologresTable);
                                };
                    }
                } else {
                    if (isFlinkLTZ) {
                        // flink TIMESTAMP_LTZ -> holo TIMESTAMP
                        throw new UnsupportedOperationException(
                                "The hologres connector does not support writing flink timestamp_ltz type to hologres timestamp type, please use hologres timestamp with timezone instead.");
                    } else {
                        // flink TIMESTAMP -> holo TIMESTAMP
                        fieldWriter =
                                (obj) -> {
                                    rowDataWriter.writeTimestamp(
                                            (TimestampData) obj, columnIndexInHologresTable);
                                };
                    }
                }
                break;
            case Types.ARRAY:
                if (fieldType.getTypeRoot().equals(LogicalTypeRoot.VARCHAR)) {
                    fieldWriter =
                            (obj) -> {
                                rowDataWriter.writeStringArray(
                                        ((String) obj).split(arrayDelimiter),
                                        columnIndexInHologresTable);
                            };
                } else {
                    fieldWriter =
                            createArrayFieldWriter(
                                    fieldType,
                                    hologresTypeName,
                                    rowDataWriter,
                                    columnIndexInHologresTable);
                }
                break;
            case Types.OTHER:
                // The type check of jdbc copy writer is relatively strict, only supports
                // java.lang.String but not BinaryStringData.
                if (hologresTypeName.equals("json") || hologresTypeName.equals("jsonb")) {
                    fieldWriter =
                            (obj) ->
                                    rowDataWriter.writeString(
                                            (StringData) obj, columnIndexInHologresTable);
                } else {
                    fieldWriter =
                            (obj) -> rowDataWriter.writeObject(obj, columnIndexInHologresTable);
                }
                break;
            default:
                throw new IllegalArgumentException(
                        String.format(
                                "Hologres sink does not support data type %s for now", fieldType));
        }

        return (obj) -> {
            if (obj == null) {
                rowDataWriter.writeNull(columnIndexInHologresTable);
            } else {
                fieldWriter.writeValue(obj);
            }
        };
    }