in flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerDialect.java [71:126]
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
List<String> nonUniqueKeyFields =
Arrays.stream(fieldNames)
.filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
.collect(Collectors.toList());
String fieldsProjection =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String valuesBinding =
Arrays.stream(fieldNames)
.map(f -> ":" + f + " " + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String usingClause = String.format("SELECT %s", valuesBinding);
String onConditions =
Arrays.stream(uniqueKeyFields)
.map(
f ->
"[TARGET]."
+ quoteIdentifier(f)
+ "=[SOURCE]."
+ quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String updateSetClause =
nonUniqueKeyFields.stream()
.map(
f ->
"[TARGET]."
+ quoteIdentifier(f)
+ "=[SOURCE]."
+ quoteIdentifier(f))
.collect(Collectors.joining(", "));
String insertValues =
Arrays.stream(fieldNames)
.map(f -> "[SOURCE]." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
return Optional.of(
String.format(
"MERGE INTO %s AS [TARGET]"
+ " USING (%s) AS [SOURCE]"
+ " ON (%s)"
+ " WHEN MATCHED THEN"
+ " UPDATE SET %s"
+ " WHEN NOT MATCHED THEN"
+ " INSERT (%s) VALUES (%s);",
quoteIdentifier(tableName),
usingClause,
onConditions,
updateSetClause,
fieldsProjection,
insertValues));
}