in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [336:360]
public void sync(){
synchronized (mapBuffer) {
List<Row> addBuffer = new ArrayList();
List<Row> deleteBuffer = new ArrayList();
Collection<Tuple2<Boolean, Row>> buffer = mapBuffer;
if (buffer.size() > 0) {
for (Tuple2<Boolean, Row> rowTuple2 : buffer) {
if (rowTuple2.f0) {
addBuffer.add(rowTuple2.f1);
} else {
deleteBuffer.add(rowTuple2.f1);
}
}
batchWrite(addBuffer);
if (existsPrimaryKeys) {
batchDelete(deleteBuffer);
} else {
batchDeleteWithoutPk(deleteBuffer);
}
}
mapBuffer.clear();
inputCount = 0;
lastWriteTime = System.currentTimeMillis();
}
}