in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java [118:173]
protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return val -> null;
case BOOLEAN:
case FLOAT:
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
return val -> val;
case TINYINT:
return val -> ((Integer) val).byteValue();
case SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
case INTEGER:
return val -> val;
case BIGINT:
return val -> val;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return val -> val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case DATE:
return val -> (int) (((Date) val).toLocalDate().toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return val -> val instanceof LocalDateTime
? TimestampData.fromLocalDateTime((LocalDateTime) val)
: TimestampData.fromTimestamp((Timestamp) val);
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
case BINARY:
case VARBINARY:
return val -> (byte[]) val;
case ARRAY:
case ROW:
case MAP:
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}