in pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java [128:163]
private void flush() {
List<Put> puts = new ArrayList<>();
List<Record<T>> toFlushList;
synchronized (this) {
if (incomingList.isEmpty()) {
return;
}
toFlushList = incomingList;
incomingList = Lists.newArrayList();
}
if (CollectionUtils.isNotEmpty(toFlushList)) {
for (Record<T> record: toFlushList) {
try {
bindValue(record, puts);
} catch (Exception e) {
record.fail();
toFlushList.remove(record);
log.warn("Record flush thread was exception ", e);
}
}
}
try {
if (CollectionUtils.isNotEmpty(puts)) {
table.batch(puts, new Object[puts.size()]);
}
toFlushList.forEach(tRecord -> tRecord.ack());
puts.clear();
toFlushList.clear();
} catch (Exception e) {
toFlushList.forEach(tRecord -> tRecord.fail());
log.error("Hbase table put data exception ", e);
}
}