in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [486:531]
public void sync() {
int total = mapBufferWithPk.size() + mapBufferWithoutPk.size();
if (1 == verbose && total > 0) {
LOG.info("start to sync " + total + " records.");
}
// Synchronized mapBuffer or mapBufferWithoutPk according to existsPrimaryKeys
synchronized (existsPrimaryKeys ? mapBufferWithPk : mapBufferWithoutPk) {
List<RowData> addBuffer = new ArrayList<>();
List<RowData> deleteBuffer = new ArrayList<>();
Collection<RowData> buffer = existsPrimaryKeys ? mapBufferWithPk.values() : mapBufferWithoutPk;
if (!buffer.isEmpty()) {
for (RowData row : buffer) {
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
addBuffer.add(row);
break;
case DELETE:
case UPDATE_BEFORE:
deleteBuffer.add(row);
break;
default:
throw new RuntimeException(
"Not supported row kind " + row.getRowKind());
}
}
batchAdd(addBuffer);
if (!deleteBuffer.isEmpty()) {
if (existsPrimaryKeys) {
batchDeleteWithPK(deleteBuffer);
} else {
batchDeleteWithoutPk(deleteBuffer);
}
}
}
if (1 == verbose && total > 0) {
LOG.info("finished syncing " + total + " records.");
}
// Clear mapBuffer and mapBufferWithoutPk
mapBufferWithPk.clear();
mapBufferWithoutPk.clear();
inputCount = 0;
lastWriteTime = System.currentTimeMillis();
}
}