in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/StreamingServerRowConverter.java [169:205]
protected StreamingServerDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return val -> null;
case BOOLEAN:
case FLOAT:
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
return val -> val;
case TINYINT:
return val -> ((Integer) val).byteValue();
case SMALLINT:
return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
case INTEGER:
return val -> val;
case BIGINT:
return val -> val;
case TIME_WITHOUT_TIME_ZONE:
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val -> TimestampData.fromTimestamp((Timestamp) val);
case DATE:
// gRPC does not support data type, so we use string to represent date and cast to date type at server side
case DECIMAL:
// gRPC does not support decimal type, so we use string to represent date and cast to date type at server side
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
case BINARY:
case VARBINARY:
return val -> (byte[]) val;
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}