protected StreamingServerSerializationConverter createExternalConverter()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/StreamingServerRowConverter.java [245:301]


    protected StreamingServerSerializationConverter createExternalConverter(LogicalType type) {
        switch (type.getTypeRoot()) {
            case INTEGER:
            case INTERVAL_YEAR_MONTH:
                return (val, fieldIndex, builder) ->
                        builder.addColumns(DBValue.newBuilder().setInt32Value(val.getInt(fieldIndex)));
            case INTERVAL_DAY_TIME:
            case BIGINT:
                return (val, fieldIndex, builder) ->
                        builder.addColumns(DBValue.newBuilder().setInt64Value(val.getLong(fieldIndex)));
            case FLOAT:
                return (val, fieldIndex, builder) ->
                        builder.addColumns(DBValue.newBuilder().setFloat32Value(val.getFloat(fieldIndex)));
            case DOUBLE:
                return (val, fieldIndex, builder) ->
                        builder.addColumns(DBValue.newBuilder().setFloat64Value(val.getDouble(fieldIndex)));
            // These type can map to text type in ADBPG
            case DECIMAL:
            case BOOLEAN:
            case TINYINT:
            case SMALLINT:
            case DATE:
            case TIMESTAMP_WITH_TIME_ZONE:
            case CHAR:
            case VARCHAR:
                return (val, fieldIndex, builder) ->
                        builder.addColumns(DBValue.newBuilder().setStringValue(val.getString(fieldIndex).toString()));
            case BINARY:
            case VARBINARY:
                return (val, fieldIndex, builder) ->
                        builder.addColumns(DBValue.newBuilder().setBytesValue(ByteString.copyFrom(val.getBinary(fieldIndex))));
            case TIME_WITHOUT_TIME_ZONE:
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return (val, fieldIndex, builder) ->
                        builder.addColumns(
                                DBValue.newBuilder().setTimeStampValue(
                                        com.google.protobuf.Timestamp.newBuilder().setSeconds(
                                                val.getTimestamp(
                                                        fieldIndex,
                                                        ((TimestampType) type).getPrecision()).toTimestamp().getTime()
                                        ).setNanos(
                                                val.getTimestamp(
                                                        fieldIndex,
                                                        ((TimestampType) type).getPrecision()).toTimestamp().getNanos()
                                        )
                                )
                        );
            // Does not support
            case ARRAY:
            case MAP:
            case MULTISET:
            case ROW:
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type:" + type);
        }
    }