private synchronized ConsumerAndRecords createConsumerAndRecords()

in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [265:278]


  private synchronized ConsumerAndRecords createConsumerAndRecords() {
    try {
      KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps);
      ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID);
      logger.info("Created new consumer to connect to Kafka");
      car.consumer.subscribe(Arrays.asList(topic.get()),
                             new ChannelRebalanceListener(rebalanceFlag));
      car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
      consumers.add(car);
      return car;
    } catch (Exception e) {
      throw new FlumeException("Unable to connect to Kafka", e);
    }
  }