in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [388:418]
public void writeRecord(RowData record) throws IOException {
if (null == record) {
return;
}
RowData rowData = rowDataSerializer.copy(record);
inputCount++;
// replace '\u0000'
if (replaceNullChar) {
rowData = replaceNullCharFromRecord(rowData);
}
if (existsPrimaryKeys) {
synchronized (mapBufferWithPk) {
// Construct primary key string as map key
String dupKey = constructDupKey(rowData, pkIndex);
mapBufferWithPk.put(dupKey, rowData);
}
} else {
synchronized (mapBufferWithoutPk) {
// Add row to list when primary key does not exist
mapBufferWithoutPk.add(rowData);
}
}
if (inputCount >= batchSize) {
sync();
} else if (System.currentTimeMillis() - this.lastWriteTime > this.batchWriteTimeout) {
sync();
}
}