in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [265:278]
private synchronized ConsumerAndRecords createConsumerAndRecords() {
try {
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps);
ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID);
logger.info("Created new consumer to connect to Kafka");
car.consumer.subscribe(Arrays.asList(topic.get()),
new ChannelRebalanceListener(rebalanceFlag));
car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
consumers.add(car);
return car;
} catch (Exception e) {
throw new FlumeException("Unable to connect to Kafka", e);
}
}