private void flush()

in pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java [128:163]


    private void flush() {
        List<Put> puts = new ArrayList<>();
        List<Record<T>>  toFlushList;
        synchronized (this) {
            if (incomingList.isEmpty()) {
                return;
            }
            toFlushList = incomingList;
            incomingList = Lists.newArrayList();
        }

        if (CollectionUtils.isNotEmpty(toFlushList)) {
            for (Record<T> record: toFlushList) {
                try {
                    bindValue(record, puts);
                } catch (Exception e) {
                    record.fail();
                    toFlushList.remove(record);
                    log.warn("Record flush thread was exception ", e);
                }
            }
        }

        try {
            if (CollectionUtils.isNotEmpty(puts)) {
                table.batch(puts, new Object[puts.size()]);
            }

            toFlushList.forEach(tRecord -> tRecord.ack());
            puts.clear();
            toFlushList.clear();
        } catch (Exception e) {
            toFlushList.forEach(tRecord -> tRecord.fail());
            log.error("Hbase table put data exception ", e);
        }
    }