public void sync()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [486:531]


    public void sync() {
        int total = mapBufferWithPk.size() + mapBufferWithoutPk.size();
        if (1 == verbose && total > 0) {
            LOG.info("start to sync " + total + " records.");
        }
        // Synchronized mapBuffer or mapBufferWithoutPk according to existsPrimaryKeys
        synchronized (existsPrimaryKeys ? mapBufferWithPk : mapBufferWithoutPk) {
            List<RowData> addBuffer = new ArrayList<>();
            List<RowData> deleteBuffer = new ArrayList<>();
            Collection<RowData> buffer = existsPrimaryKeys ? mapBufferWithPk.values() : mapBufferWithoutPk;
            if (!buffer.isEmpty()) {
                for (RowData row : buffer) {
                    switch (row.getRowKind()) {
                        case INSERT:
                        case UPDATE_AFTER:
                            addBuffer.add(row);
                            break;
                        case DELETE:
                        case UPDATE_BEFORE:
                            deleteBuffer.add(row);
                            break;
                        default:
                            throw new RuntimeException(
                                    "Not supported row kind " + row.getRowKind());
                    }
                }
                batchAdd(addBuffer);

                if (!deleteBuffer.isEmpty()) {
                    if (existsPrimaryKeys) {
                        batchDeleteWithPK(deleteBuffer);
                    } else {
                        batchDeleteWithoutPk(deleteBuffer);
                    }
                }
            }
            if (1 == verbose && total > 0) {
                LOG.info("finished syncing " + total + " records.");
            }
            // Clear mapBuffer and mapBufferWithoutPk
            mapBufferWithPk.clear();
            mapBufferWithoutPk.clear();
            inputCount = 0;
            lastWriteTime = System.currentTimeMillis();
        }
    }