in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java [195:207]
protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
final int sqlType = JdbcTypeUtil.logicalTypeToSqlType(type.getTypeRoot());
return (val, index, statement) -> {
if (val == null
|| val.isNullAt(index)
|| LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
statement.setNull(index, sqlType);
} else {
jdbcSerializationConverter.serialize(val, index, statement);
}
};
}