public Optional getUpsertStatement()

in flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerDialect.java [71:126]


    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        List<String> nonUniqueKeyFields =
                Arrays.stream(fieldNames)
                        .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
                        .collect(Collectors.toList());
        String fieldsProjection =
                Arrays.stream(fieldNames)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));

        String valuesBinding =
                Arrays.stream(fieldNames)
                        .map(f -> ":" + f + " " + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));

        String usingClause = String.format("SELECT %s", valuesBinding);
        String onConditions =
                Arrays.stream(uniqueKeyFields)
                        .map(
                                f ->
                                        "[TARGET]."
                                                + quoteIdentifier(f)
                                                + "=[SOURCE]."
                                                + quoteIdentifier(f))
                        .collect(Collectors.joining(" AND "));
        String updateSetClause =
                nonUniqueKeyFields.stream()
                        .map(
                                f ->
                                        "[TARGET]."
                                                + quoteIdentifier(f)
                                                + "=[SOURCE]."
                                                + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));

        String insertValues =
                Arrays.stream(fieldNames)
                        .map(f -> "[SOURCE]." + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));
        return Optional.of(
                String.format(
                        "MERGE INTO %s AS [TARGET]"
                                + " USING (%s) AS [SOURCE]"
                                + " ON (%s)"
                                + " WHEN MATCHED THEN"
                                + " UPDATE SET %s"
                                + " WHEN NOT MATCHED THEN"
                                + " INSERT (%s) VALUES (%s);",
                        quoteIdentifier(tableName),
                        usingClause,
                        onConditions,
                        updateSetClause,
                        fieldsProjection,
                        insertValues));
    }