public Optional getUpsertStatement()

in flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/dialect/Db2Dialect.java [72:124]


    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        List<String> nonUniqueKeyFields =
                Arrays.stream(fieldNames)
                        .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
                        .collect(Collectors.toList());
        String fieldsProjection =
                Arrays.stream(fieldNames)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));

        String valuesBinding =
                Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));

        String columnBinding =
                Arrays.stream(fieldNames)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));

        String onConditions =
                Arrays.stream(uniqueKeyFields)
                        .map(f -> "TARGET." + quoteIdentifier(f) + "= SOURCE." + quoteIdentifier(f))
                        .collect(Collectors.joining(" AND "));
        String updateSetClause =
                nonUniqueKeyFields.stream()
                        .map(f -> "TARGET." + quoteIdentifier(f) + "= SOURCE." + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));

        String insertValues =
                Arrays.stream(fieldNames)
                        .map(f -> "SOURCE." + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));

        Optional<String> format =
                Optional.of(
                        String.format(
                                "MERGE INTO %s AS TARGET"
                                        + " USING TABLE (VALUES ( %s )) AS SOURCE ( %s )"
                                        + " ON (%s)"
                                        + " WHEN MATCHED THEN"
                                        + " UPDATE SET %s"
                                        + " WHEN NOT MATCHED THEN"
                                        + " INSERT (%s) VALUES (%s);",
                                quoteIdentifier(tableName),
                                valuesBinding,
                                columnBinding,
                                onConditions,
                                updateSetClause,
                                fieldsProjection,
                                insertValues));

        return format;
    }