in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/AdbpgDialect.java [74:94]
public String getUpsertStatement(
String tableName,
String[] fieldNames,
String[] uniqueKeyFields,
String[] UpdateFields) {
String uniqueColumns =
Arrays.stream(uniqueKeyFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String updateClause =
Arrays.stream(UpdateFields)
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String conflictAction = " ON CONFLICT ("
+ uniqueColumns
+ ")"
+ " DO UPDATE SET "
+ updateClause;
return getInsertIntoStatement(tableName, fieldNames) + conflictAction;
}