in flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/dialect/Db2Dialect.java [72:124]
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
List<String> nonUniqueKeyFields =
Arrays.stream(fieldNames)
.filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
.collect(Collectors.toList());
String fieldsProjection =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String valuesBinding =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
String columnBinding =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String onConditions =
Arrays.stream(uniqueKeyFields)
.map(f -> "TARGET." + quoteIdentifier(f) + "= SOURCE." + quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String updateSetClause =
nonUniqueKeyFields.stream()
.map(f -> "TARGET." + quoteIdentifier(f) + "= SOURCE." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String insertValues =
Arrays.stream(fieldNames)
.map(f -> "SOURCE." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
Optional<String> format =
Optional.of(
String.format(
"MERGE INTO %s AS TARGET"
+ " USING TABLE (VALUES ( %s )) AS SOURCE ( %s )"
+ " ON (%s)"
+ " WHEN MATCHED THEN"
+ " UPDATE SET %s"
+ " WHEN NOT MATCHED THEN"
+ " INSERT (%s) VALUES (%s);",
quoteIdentifier(tableName),
valuesBinding,
columnBinding,
onConditions,
updateSetClause,
fieldsProjection,
insertValues));
return format;
}