in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [168:214]
public void configure(Context ctx) {
topicStr = ctx.getString(TOPIC_CONFIG);
if (topicStr == null || topicStr.isEmpty()) {
topicStr = DEFAULT_TOPIC;
logger.info("Topic was not specified. Using {} as the topic.", topicStr);
}
topic.set(topicStr);
groupId = ctx.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
if (groupId == null || groupId.isEmpty()) {
groupId = DEFAULT_GROUP_ID;
logger.info("Group ID was not specified. Using {} as the group id.", groupId);
}
String bootStrapServers = ctx.getString(BOOTSTRAP_SERVERS_CONFIG);
if (bootStrapServers == null || bootStrapServers.isEmpty()) {
throw new ConfigurationException("Bootstrap Servers must be specified");
}
String transactionalID = ctx.getString(TRANSACTIONAL_ID);
if (transactionalID != null) {
try {
ctx.put(TRANSACTIONAL_ID, InetAddress.getLocalHost().getCanonicalHostName() +
Thread.currentThread().getName() + transactionalID);
useKafkaTransactions = true;
} catch (UnknownHostException e) {
throw new ConfigurationException("Unable to configure transactional id, as cannot work out hostname", e);
}
}
setProducerProps(ctx, bootStrapServers);
setConsumerProps(ctx, bootStrapServers);
parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT);
staticPartitionId = ctx.getInteger(STATIC_PARTITION_CONF);
partitionHeader = ctx.getString(PARTITION_HEADER_NAME);
if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
logger.debug("Kafka properties: {}", ctx);
}
if (counter == null) {
counter = new KafkaChannelCounter(getName());
}
}