in flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java [366:425]
protected void doConfigure(Context context) throws FlumeException {
this.context = context;
headers = new HashMap<String, String>(4);
tpAndOffsetMetadata = new HashMap<TopicPartition, OffsetAndMetadata>();
rebalanceFlag = new AtomicBoolean(false);
kafkaProps = new Properties();
String topicProperty = context.getString(TOPICS_REGEX);
if (topicProperty != null && !topicProperty.isEmpty()) {
// create subscriber that uses pattern-based subscription
subscriber = new PatternSubscriber(topicProperty);
} else if ((topicProperty = context.getString(TOPICS)) != null &&
!topicProperty.isEmpty()) {
// create subscriber that uses topic list subscription
subscriber = new TopicListSubscriber(topicProperty);
} else if (subscriber == null) {
throw new ConfigurationException("At least one Kafka topic must be specified.");
}
batchUpperLimit = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
maxBatchDurationMillis = context.getInteger(BATCH_DURATION_MS, DEFAULT_BATCH_DURATION);
useAvroEventFormat = context.getBoolean(AVRO_EVENT, DEFAULT_AVRO_EVENT);
if (log.isDebugEnabled()) {
log.debug(AVRO_EVENT + " set to: {}", useAvroEventFormat);
}
bootstrapServers = context.getString(BOOTSTRAP_SERVERS);
if (bootstrapServers == null || bootstrapServers.isEmpty()) {
throw new ConfigurationException("Bootstrap Servers must be specified");
}
String groupIdProperty =
context.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
if (groupIdProperty != null && !groupIdProperty.isEmpty()) {
groupId = groupIdProperty; // Use the new group id property
}
if (groupId == null || groupId.isEmpty()) {
groupId = DEFAULT_GROUP_ID;
log.info("Group ID was not specified. Using {} as the group id.", groupId);
}
setTopicHeader = context.getBoolean(SET_TOPIC_HEADER, DEFAULT_SET_TOPIC_HEADER);
topicHeader = context.getString(TOPIC_HEADER, DEFAULT_TOPIC_HEADER);
headerMap = context.getSubProperties(KAFKA_HEADER);
setConsumerProps(context);
if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
log.debug("Kafka consumer properties: {}", kafkaProps);
}
if (counter == null) {
counter = new KafkaSourceCounter(getName());
}
}