in flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialect.java [71:93]
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
String uniqueColumns =
Arrays.stream(uniqueKeyFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
final Set<String> uniqueKeyFieldsSet = new HashSet<>(Arrays.asList(uniqueKeyFields));
String updateClause =
Arrays.stream(fieldNames)
.filter(f -> !uniqueKeyFieldsSet.contains(f))
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String conflictAction =
updateClause.isEmpty()
? " DO NOTHING"
: String.format(" DO UPDATE SET %s", updateClause);
return Optional.of(
getInsertIntoStatement(tableName, fieldNames)
+ " ON CONFLICT ("
+ uniqueColumns
+ ")"
+ conflictAction);
}