in flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java [475:486]
protected void doStart() throws FlumeException {
log.info("Starting {}...", this);
//initialize a consumer.
consumer = new KafkaConsumer<String, byte[]>(kafkaProps);
// Subscribe for topics by already specified strategy
subscriber.subscribe(consumer, new SourceRebalanceListener(rebalanceFlag));
log.info("Kafka source {} started.", getName());
counter.start();
}