in pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java [70:105]
private void flush() {
List<Record<R>> toFlushList;
synchronized (this) {
if (incomingList.isEmpty()) {
return;
}
toFlushList = incomingList;
incomingList = Lists.newArrayList();
}
val points = Lists.<T>newArrayListWithExpectedSize(toFlushList.size());
if (CollectionUtils.isNotEmpty(toFlushList)) {
for (Record<R> record: toFlushList) {
try {
points.add(buildPoint(record));
} catch (Exception e) {
record.fail();
toFlushList.remove(record);
log.warn("Record flush thread was exception ", e);
}
}
}
try {
if (CollectionUtils.isNotEmpty(points)) {
writePoints(points);
}
toFlushList.forEach(Record::ack);
points.clear();
toFlushList.clear();
} catch (Exception e) {
toFlushList.forEach(Record::fail);
log.error("InfluxDB write batch data exception ", e);
}
}