in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java [209:268]
protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
return (val, index, statement) ->
statement.setBoolean(index, val.getBoolean(index));
case TINYINT:
return (val, index, statement) -> statement.setByte(index, val.getByte(index));
case SMALLINT:
return (val, index, statement) -> statement.setShort(index, val.getShort(index));
case INTEGER:
case INTERVAL_YEAR_MONTH:
return (val, index, statement) -> statement.setInt(index, val.getInt(index));
case BIGINT:
case INTERVAL_DAY_TIME:
return (val, index, statement) -> statement.setLong(index, val.getLong(index));
case FLOAT:
return (val, index, statement) -> statement.setFloat(index, val.getFloat(index));
case DOUBLE:
return (val, index, statement) -> statement.setDouble(index, val.getDouble(index));
case CHAR:
case VARCHAR:
// value is BinaryString
return (val, index, statement) ->
statement.setString(index, val.getString(index).toString());
case BINARY:
case VARBINARY:
return (val, index, statement) -> statement.setBytes(index, val.getBinary(index));
case DATE:
return (val, index, statement) ->
statement.setDate(
index, Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))));
case TIME_WITHOUT_TIME_ZONE:
return (val, index, statement) ->
statement.setTime(
index,
Time.valueOf(
LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L)));
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int timestampPrecision = ((TimestampType) type).getPrecision();
return (val, index, statement) ->
statement.setTimestamp(
index, val.getTimestamp(index, timestampPrecision).toTimestamp());
case DECIMAL:
final int decimalPrecision = ((DecimalType) type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
return (val, index, statement) ->
statement.setBigDecimal(
index,
val.getDecimal(index, decimalPrecision, decimalScale)
.toBigDecimal());
case ARRAY:
case MAP:
case MULTISET:
case ROW:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}