in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/JdbcRowConverter.java [233:302]
protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
return (val, fieldIndex, statement, sqlIndex) ->
statement.setBoolean(sqlIndex, val.getBoolean(fieldIndex));
case TINYINT:
return (val, fieldIndex, statement, sqlIndex) ->
statement.setByte(sqlIndex, val.getByte(fieldIndex));
case SMALLINT:
return (val, fieldIndex, statement, sqlIndex) ->
statement.setShort(sqlIndex, val.getShort(fieldIndex));
case INTEGER:
case INTERVAL_YEAR_MONTH:
return (val, fieldIndex, statement, sqlIndex) ->
statement.setInt(sqlIndex, val.getInt(fieldIndex));
case BIGINT:
case INTERVAL_DAY_TIME:
return (val, fieldIndex, statement, sqlIndex) ->
statement.setLong(sqlIndex, val.getLong(fieldIndex));
case FLOAT:
return (val, fieldIndex, statement, sqlIndex) ->
statement.setFloat(sqlIndex, val.getFloat(fieldIndex));
case DOUBLE:
return (val, fieldIndex, statement, sqlIndex) ->
statement.setDouble(sqlIndex, val.getDouble(fieldIndex));
case CHAR:
case VARCHAR:
// value is BinaryString
return (val, fieldIndex, statement, sqlIndex) ->
statement.setString(sqlIndex, val.getString(fieldIndex).toString());
case BINARY:
case VARBINARY:
return (val, fieldIndex, statement, sqlIndex) ->
statement.setBytes(sqlIndex, val.getBinary(fieldIndex));
case DATE:
return (val, fieldIndex, statement, sqlIndex) ->
statement.setDate(
sqlIndex,
Date.valueOf(LocalDate.ofEpochDay(val.getInt(fieldIndex))));
case TIME_WITHOUT_TIME_ZONE:
return (val, fieldIndex, statement, sqlIndex) ->
statement.setTime(
sqlIndex,
Time.valueOf(
LocalTime.ofNanoOfDay(
val.getInt(fieldIndex) * 1_000_000L)));
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int timestampPrecision = ((TimestampType) type).getPrecision();
return (val, fieldIndex, statement, sqlIndex) ->
statement.setTimestamp(
sqlIndex,
val.getTimestamp(fieldIndex, timestampPrecision).toTimestamp());
case DECIMAL:
final int decimalPrecision = ((DecimalType) type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
return (val, fieldIndex, statement, sqlIndex) ->
statement.setBigDecimal(
sqlIndex,
val.getDecimal(fieldIndex, decimalPrecision, decimalScale)
.toBigDecimal());
case ARRAY:
case MAP:
case MULTISET:
case ROW:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}