protected JdbcSerializationConverter createExternalConverter()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/JdbcRowConverter.java [233:302]


    protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
        switch (type.getTypeRoot()) {
            case BOOLEAN:
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setBoolean(sqlIndex, val.getBoolean(fieldIndex));
            case TINYINT:
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setByte(sqlIndex, val.getByte(fieldIndex));
            case SMALLINT:
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setShort(sqlIndex, val.getShort(fieldIndex));
            case INTEGER:
            case INTERVAL_YEAR_MONTH:
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setInt(sqlIndex, val.getInt(fieldIndex));
            case BIGINT:
            case INTERVAL_DAY_TIME:
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setLong(sqlIndex, val.getLong(fieldIndex));
            case FLOAT:
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setFloat(sqlIndex, val.getFloat(fieldIndex));
            case DOUBLE:
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setDouble(sqlIndex, val.getDouble(fieldIndex));
            case CHAR:
            case VARCHAR:
                // value is BinaryString
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setString(sqlIndex, val.getString(fieldIndex).toString());
            case BINARY:
            case VARBINARY:
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setBytes(sqlIndex, val.getBinary(fieldIndex));
            case DATE:
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setDate(
                                sqlIndex,
                                Date.valueOf(LocalDate.ofEpochDay(val.getInt(fieldIndex))));
            case TIME_WITHOUT_TIME_ZONE:
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setTime(
                                sqlIndex,
                                Time.valueOf(
                                        LocalTime.ofNanoOfDay(
                                                val.getInt(fieldIndex) * 1_000_000L)));
            case TIMESTAMP_WITH_TIME_ZONE:
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                final int timestampPrecision = ((TimestampType) type).getPrecision();
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setTimestamp(
                                sqlIndex,
                                val.getTimestamp(fieldIndex, timestampPrecision).toTimestamp());
            case DECIMAL:
                final int decimalPrecision = ((DecimalType) type).getPrecision();
                final int decimalScale = ((DecimalType) type).getScale();
                return (val, fieldIndex, statement, sqlIndex) ->
                        statement.setBigDecimal(
                                sqlIndex,
                                val.getDecimal(fieldIndex, decimalPrecision, decimalScale)
                                        .toBigDecimal());
            case ARRAY:
            case MAP:
            case MULTISET:
            case ROW:
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type:" + type);
        }
    }