in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [382:401]
public void close(Collection<TopicPartition> partitions) {
log.warn("Closing writers in KustoSinkTask");
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
// First stop so that no more ingestions trigger from timer flushes
partitions.forEach((TopicPartition tp) -> writers.get(tp).stop());
for (TopicPartition tp : partitions) {
try {
writers.get(tp).close();
// TODO: if we still get duplicates from rebalance - consider keeping writers
// aside - we might
// just get the same topic partition again
writers.remove(tp);
assignment.remove(tp);
} catch (ConnectException e) {
log.error("Error closing topic partition for {}.", tp, e);
} finally {
countDownLatch.countDown();
}
}
}