in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java [89:127]
public JdbcOutputFormat<RowData, ?, ?> build() {
checkNotNull(jdbcOptions, "jdbc options can not be null");
checkNotNull(dmlOptions, "jdbc dml options can not be null");
checkNotNull(executionOptions, "jdbc execution options can not be null");
final LogicalType[] logicalTypes =
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new);
if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0) {
// upsert query
return new JdbcOutputFormat<>(
new SimpleJdbcConnectionProvider(jdbcOptions),
executionOptions,
ctx ->
createBufferReduceExecutor(
dmlOptions, ctx, rowDataTypeInformation, logicalTypes),
JdbcOutputFormat.RecordExtractor.identity());
} else {
// append only query
final String sql =
dmlOptions
.getDialect()
.getInsertIntoStatement(
dmlOptions.getTableName(), dmlOptions.getFieldNames());
return new JdbcOutputFormat<>(
new SimpleJdbcConnectionProvider(jdbcOptions),
executionOptions,
ctx ->
createSimpleBufferedExecutor(
ctx,
dmlOptions.getDialect(),
dmlOptions.getFieldNames(),
logicalTypes,
sql,
rowDataTypeInformation),
JdbcOutputFormat.RecordExtractor.identity());
}
}