private void batchAdd()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [668:738]


    private void batchAdd(List<RowData> rows) {
        long bps = 0;
        if (null == rows || rows.isEmpty()) {
            return;
        }
        try {
            long start = System.currentTimeMillis();
            // TODO add a "copy on conflict" mode directly to replace "copy on conflict" when writemode is "copy" and conflictmode is "upsert"
            if (writeMode == 1) {                   /** copy, this is the default write mode */
                // preprocess data to be written to the database using copy writemode
                byte[] data = preprocessCopyData(rows);
                bps = executeCopy(data);
                long end = System.currentTimeMillis();
                reportMetric(rows, start, end, bps);
            } else if (writeMode == 2) {            /** batch upsert */
                String sql = adbpgDialect.getUpsertStatement(tableName, fieldNamesStrs, primaryFieldNamesStr, nonPrimaryFieldNamesStr);
                executeSqlWithPrepareStatement(sql, rows, rowConverter, false);
            } else if (writeMode == 0) {            /** batch insert */
                String insertSql = adbpgDialect.getInsertIntoStatement(tableName, fieldNamesStrs);
                executeSqlWithPrepareStatement(insertSql, rows, rowConverter, false);
            } else if (writeMode == 3) {            /** merge with streaming server api */
                executeMergeWithStreamingServer(rows, streamingServerRowConverter, false);
            } else {
                LOG.error("Unsupported write mode: " + writeMode);
                System.exit(255);
            }
            closeConnection();
        } catch (Exception exception) {
            // Get the exception message and check if it is a duplicate key exception
            String message = exception.getMessage();
            boolean isDuplicateKeyException = false;
                    if (message != null) {
                        isDuplicateKeyException = message.contains("duplicate key")
                                && message.contains("violates unique constraint")
                                || message.contains("ON CONFLICT DO UPDATE");
                    }
            if (isDuplicateKeyException) {
                LOG.warn("Batch write failed with duplicate-key exception, will retry with preset conflict-mode.");
            } else {
                LOG.warn("Batch write failed with exception will retry with preset conflict action. The exception is:", exception);
            }

            // Batch upsert demotes to single upsert when conflictMode='upsert' or writeMode=2
            // note that exception generated by prepared statement stack have one extra layer
            for (RowData row : rows) {
                // note that exception generated by prepared statement stack have one extra layer
                if (exception instanceof BatchUpdateException) {
                    exception = ((BatchUpdateException) exception).getNextException();
                }

                if (isDuplicateKeyException) {
                    if ("strict".equalsIgnoreCase(conflictMode)) {                    /** conflictMode = 'strict', report error without any action */
                        throw new RuntimeException("duplicate key value violates unique constraint");
                    } else if ("upsert".equalsIgnoreCase(conflictMode)) {             /** conflictMode = 'upsert', use upsert sql */
                        LOG.warn("Retrying to replace record with upsert.");
                        upsertRow(row);
                    } else if ("ignore".equalsIgnoreCase(conflictMode)) {
                        LOG.warn("Batch write failed, because preset conflictmode is 'ignore', connector will skip this row");
                    } else {                                                           /** conflictMode = 'update' or any other string, use update sql */
                        updateRow(row);
                    }
                } else {
                    // exceptionMode only have "strict" and "ignore", if this is "ignore" return directly without report an expection
                    if ("strict".equalsIgnoreCase(exceptionMode)) {
                        LOG.error("Found unexpect exception, will ignore this row.", exception);
                        System.exit(255);
                    }
                }
            }
        }
    }