in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java [89:112]
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
helper.validate();
validateConfigOptions(config, context.getClassLoader());
validateDataTypeWithJdbcDialect(
context.getPhysicalRowDataType(),
config.get(URL),
config.get(COMPATIBLE_MODE),
context.getClassLoader());
InternalJdbcConnectionOptions jdbcOptions =
getJdbcOptions(config, context.getClassLoader());
return new JdbcDynamicTableSink(
jdbcOptions,
getJdbcExecutionOptions(config),
getJdbcDmlOptions(
jdbcOptions,
context.getPhysicalRowDataType(),
context.getPrimaryKeyIndexes()),
context.getPhysicalRowDataType());
}