in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/StringFormatRowConverter.java [93:148]
protected StringFormatConverter createStringFormatConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
return (val, index) -> val.getBoolean(index) ? "'true'" : "'false'";
case TINYINT:
return (val, index) -> "" + ((Byte) val.getByte(index)).intValue();
case SMALLINT:
return (val, index) -> String.valueOf(val.getShort(index));
case INTEGER:
case INTERVAL_YEAR_MONTH:
return (val, index) -> String.valueOf(val.getInt(index));
case BIGINT:
case INTERVAL_DAY_TIME:
return (val, index) -> String.valueOf(val.getLong(index));
case FLOAT:
return (val, index) -> String.valueOf(val.getFloat(index));
case DOUBLE:
return (val, index) -> String.valueOf(val.getDouble(index));
case DECIMAL:
return (val, index) -> {
final int decimalPrecision = ((DecimalType) type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
return String.valueOf(
val.getDecimal(index, decimalPrecision, decimalScale)
.toBigDecimal());
};
case CHAR:
case VARCHAR:
// value is BinaryString
return (val, index) -> toCopyField(val.getString(index).toString());
case BINARY:
case VARBINARY:
return (val, index) -> new String(val.getBinary(index));
case DATE:
return (val, index) ->
String.valueOf(Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))));
case TIME_WITHOUT_TIME_ZONE:
return (val, index) ->
String.valueOf(
Time.valueOf(
LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L)));
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int timestampPrecision = ((TimestampType) type).getPrecision();
return (val, index) ->
String.valueOf(val.getTimestamp(index, timestampPrecision).toTimestamp());
case ARRAY:
case MAP:
case MULTISET:
case ROW:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}