in flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/dialect/OracleDialect.java [75:128]
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
String sourceFields =
Arrays.stream(fieldNames)
.map(f -> ":" + f + " " + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String onClause =
Arrays.stream(uniqueKeyFields)
.map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))
.collect(Collectors.joining(" and "));
final Set<String> uniqueKeyFieldsSet =
Arrays.stream(uniqueKeyFields).collect(Collectors.toSet());
String updateClause =
Arrays.stream(fieldNames)
.filter(f -> !uniqueKeyFieldsSet.contains(f))
.map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String insertFields =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String valuesClause =
Arrays.stream(fieldNames)
.map(f -> "s." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
// if we can't divide schema and table-name is risky to call quoteIdentifier(tableName)
// for example [tbo].[sometable] is ok but [tbo.sometable] is not
String mergeQuery =
" MERGE INTO "
+ tableName
+ " t "
+ " USING (SELECT "
+ sourceFields
+ " FROM DUAL) s "
+ " ON ("
+ onClause
+ ") "
+ " WHEN MATCHED THEN UPDATE SET "
+ updateClause
+ " WHEN NOT MATCHED THEN INSERT ("
+ insertFields
+ ")"
+ " VALUES ("
+ valuesClause
+ ")";
return Optional.of(mergeQuery);
}