in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [148:160]
public void stop() {
for (ConsumerAndRecords c : consumers) {
try {
decommissionConsumerAndRecords(c);
} catch (Exception ex) {
logger.warn("Error while shutting down consumer.", ex);
}
}
producer.close();
counter.stop();
super.stop();
logger.info("Kafka channel {} stopped.", getName());
}