static SerializationConverter createExternalConverter()

in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseRowConvert.java [76:134]


    static SerializationConverter createExternalConverter(DataType type, ZoneId pipelineZoneId) {
        switch (type.getTypeRoot()) {
            case CHAR:
            case VARCHAR:
                return (index, val) -> val.getString(index).toString();
            case BOOLEAN:
                return (index, val) -> val.getBoolean(index);
            case BINARY:
            case VARBINARY:
                return (index, val) -> val.getBinary(index);
            case DECIMAL:
                final int decimalPrecision = ((DecimalType) type).getPrecision();
                final int decimalScale = ((DecimalType) type).getScale();
                return (index, val) ->
                        val.getDecimal(index, decimalPrecision, decimalScale).toBigDecimal();
            case TINYINT:
                return (index, val) -> val.getByte(index);
            case SMALLINT:
                return (index, val) -> val.getShort(index);
            case INTEGER:
                return (index, val) -> val.getInt(index);
            case BIGINT:
                return (index, val) -> val.getLong(index);
            case FLOAT:
                return (index, val) -> val.getFloat(index);
            case DOUBLE:
                return (index, val) -> val.getDouble(index);
            case DATE:
                return (index, val) -> Date.valueOf(LocalDate.ofEpochDay(val.getInt(index)));
            case TIME_WITHOUT_TIME_ZONE:
                return (index, val) ->
                        Time.valueOf(LocalTime.ofSecondOfDay(val.getInt(index) / 1000));
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return (index, val) ->
                        val.getTimestamp(index, DataTypeChecks.getPrecision(type)).toTimestamp();
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return (index, val) ->
                        Timestamp.valueOf(
                                ZonedDateTime.ofInstant(
                                                val.getLocalZonedTimestampData(
                                                                index,
                                                                DataTypeChecks.getPrecision(type))
                                                        .toInstant(),
                                                pipelineZoneId)
                                        .toLocalDateTime());
            case TIMESTAMP_WITH_TIME_ZONE:
                final int zonedP = ((ZonedTimestampType) type).getPrecision();
                return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp();
            case ARRAY:
                return (index, val) -> convertArrayData(val.getArray(index), type);
            case MAP:
                return (index, val) -> writeValueAsString(convertMapData(val.getMap(index), type));
            case ROW:
                return (index, val) ->
                        writeValueAsString(convertRowData(val, index, type, pipelineZoneId));
            default:
                throw new UnsupportedOperationException("Unsupported type:" + type);
        }
    }