protected void doConfigure()

in flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java [366:425]


  protected void doConfigure(Context context) throws FlumeException {
    this.context = context;
    headers = new HashMap<String, String>(4);
    tpAndOffsetMetadata = new HashMap<TopicPartition, OffsetAndMetadata>();
    rebalanceFlag = new AtomicBoolean(false);
    kafkaProps = new Properties();

    String topicProperty = context.getString(TOPICS_REGEX);
    if (topicProperty != null && !topicProperty.isEmpty()) {
      // create subscriber that uses pattern-based subscription
      subscriber = new PatternSubscriber(topicProperty);
    } else if ((topicProperty = context.getString(TOPICS)) != null &&
               !topicProperty.isEmpty()) {
      // create subscriber that uses topic list subscription
      subscriber = new TopicListSubscriber(topicProperty);
    } else if (subscriber == null) {
      throw new ConfigurationException("At least one Kafka topic must be specified.");
    }

    batchUpperLimit = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
    maxBatchDurationMillis = context.getInteger(BATCH_DURATION_MS, DEFAULT_BATCH_DURATION);

    useAvroEventFormat = context.getBoolean(AVRO_EVENT, DEFAULT_AVRO_EVENT);

    if (log.isDebugEnabled()) {
      log.debug(AVRO_EVENT + " set to: {}", useAvroEventFormat);
    }

    bootstrapServers = context.getString(BOOTSTRAP_SERVERS);
    if (bootstrapServers == null || bootstrapServers.isEmpty()) {
      throw new ConfigurationException("Bootstrap Servers must be specified");
    }

    String groupIdProperty =
        context.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
    if (groupIdProperty != null && !groupIdProperty.isEmpty()) {
      groupId = groupIdProperty; // Use the new group id property
    }

    if (groupId == null || groupId.isEmpty()) {
      groupId = DEFAULT_GROUP_ID;
      log.info("Group ID was not specified. Using {} as the group id.", groupId);
    }

    setTopicHeader = context.getBoolean(SET_TOPIC_HEADER, DEFAULT_SET_TOPIC_HEADER);

    topicHeader = context.getString(TOPIC_HEADER, DEFAULT_TOPIC_HEADER);

    headerMap = context.getSubProperties(KAFKA_HEADER);

    setConsumerProps(context);

    if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
      log.debug("Kafka consumer properties: {}", kafkaProps);
    }

    if (counter == null) {
      counter = new KafkaSourceCounter(getName());
    }
  }