in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatBuilder.java [80:112]
public JdbcOutputFormat<RowData, ?, ?> build() {
checkNotNull(jdbcOptions, "jdbc options can not be null");
checkNotNull(dmlOptions, "jdbc dml options can not be null");
checkNotNull(executionOptions, "jdbc execution options can not be null");
final LogicalType[] logicalTypes =
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new);
if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0) {
// upsert query
return new JdbcOutputFormat<>(
new SimpleJdbcConnectionProvider(jdbcOptions),
executionOptions,
() -> createBufferReduceExecutor(dmlOptions, logicalTypes));
} else {
// append only query
final String sql =
dmlOptions
.getDialect()
.getInsertIntoStatement(
dmlOptions.getTableName(), dmlOptions.getFieldNames());
return new JdbcOutputFormat<>(
new SimpleJdbcConnectionProvider(jdbcOptions),
executionOptions,
() ->
createSimpleBufferedExecutor(
dmlOptions.getDialect(),
dmlOptions.getFieldNames(),
logicalTypes,
sql));
}
}