protected void doStart()

in flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java [475:486]


  protected void doStart() throws FlumeException {
    log.info("Starting {}...", this);

    //initialize a consumer.
    consumer = new KafkaConsumer<String, byte[]>(kafkaProps);

    // Subscribe for topics by already specified strategy
    subscriber.subscribe(consumer, new SourceRebalanceListener(rebalanceFlag));

    log.info("Kafka source {} started.", getName());
    counter.start();
  }