private void flush()

in pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java [70:105]


    private void flush() {
        List<Record<R>>  toFlushList;

        synchronized (this) {
            if (incomingList.isEmpty()) {
                return;
            }
            toFlushList = incomingList;
            incomingList = Lists.newArrayList();
        }

        val points = Lists.<T>newArrayListWithExpectedSize(toFlushList.size());
        if (CollectionUtils.isNotEmpty(toFlushList)) {
            for (Record<R> record: toFlushList) {
                try {
                    points.add(buildPoint(record));
                } catch (Exception e) {
                    record.fail();
                    toFlushList.remove(record);
                    log.warn("Record flush thread was exception ", e);
                }
            }
        }

        try {
            if (CollectionUtils.isNotEmpty(points)) {
                writePoints(points);
            }
            toFlushList.forEach(Record::ack);
            points.clear();
            toFlushList.clear();
        } catch (Exception e) {
            toFlushList.forEach(Record::fail);
            log.error("InfluxDB write batch data exception ", e);
        }
    }