in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/JdbcRowConverter.java [216:231]
protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
final int sqlType =
JdbcTypeUtil.typeInformationToSqlType(
TypeConversions.fromDataTypeToLegacyInfo(
TypeConversions.fromLogicalToDataType(type)));
return (val, fieldIndex, statement, sqlIndex) -> {
if (val == null
|| val.isNullAt(fieldIndex)
|| LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
statement.setNull(sqlIndex, sqlType);
} else {
jdbcSerializationConverter.serialize(val, fieldIndex, statement, sqlIndex);
}
};
}