public void writeRecord()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [388:418]


    public void writeRecord(RowData record) throws IOException {
        if (null == record) {
            return;
        }
        RowData rowData = rowDataSerializer.copy(record);
        inputCount++;

        // replace '\u0000'
        if (replaceNullChar) {
            rowData = replaceNullCharFromRecord(rowData);
        }

        if (existsPrimaryKeys) {
            synchronized (mapBufferWithPk) {
                // Construct primary key string as map key
                String dupKey = constructDupKey(rowData, pkIndex);
                mapBufferWithPk.put(dupKey, rowData);
            }
        } else {
            synchronized (mapBufferWithoutPk) {
                // Add row to list when primary key does not exist
                mapBufferWithoutPk.add(rowData);
            }
        }

        if (inputCount >= batchSize) {
            sync();
        } else if (System.currentTimeMillis() - this.lastWriteTime > this.batchWriteTimeout) {
            sync();
        }
    }