public void close()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [382:401]


    public void close(Collection<TopicPartition> partitions) {
        log.warn("Closing writers in KustoSinkTask");
        CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
        // First stop so that no more ingestions trigger from timer flushes
        partitions.forEach((TopicPartition tp) -> writers.get(tp).stop());
        for (TopicPartition tp : partitions) {
            try {
                writers.get(tp).close();
                // TODO: if we still get duplicates from rebalance - consider keeping writers
                // aside - we might
                // just get the same topic partition again
                writers.remove(tp);
                assignment.remove(tp);
            } catch (ConnectException e) {
                log.error("Error closing topic partition for {}.", tp, e);
            } finally {
                countDownLatch.countDown();
            }
        }
    }