in flume-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java [345:410]
public void configure(Context context) {
String topicStr = context.getString(KafkaSinkConstants.TOPIC_CONFIG);
if (topicStr == null || topicStr.isEmpty()) {
topicStr = KafkaSinkConstants.DEFAULT_TOPIC;
logger.warn("Topic was not specified. Using {} as the topic.", topicStr);
} else {
logger.info("Using the static topic {}. This may be overridden by event headers", topicStr);
}
topic = topicStr;
timestampHeader = context.getString(KafkaSinkConstants.TIMESTAMP_HEADER);
headerMap = context.getSubProperties(KafkaSinkConstants.KAFKA_HEADER);
batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE, KafkaSinkConstants.DEFAULT_BATCH_SIZE);
if (logger.isDebugEnabled()) {
logger.debug("Using batch size: {}", batchSize);
}
useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT,
KafkaSinkConstants.DEFAULT_AVRO_EVENT);
partitionHeader = context.getString(KafkaSinkConstants.PARTITION_HEADER_NAME);
staticPartitionId = context.getInteger(KafkaSinkConstants.STATIC_PARTITION_CONF);
allowTopicOverride = context.getBoolean(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER,
KafkaSinkConstants.DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER);
topicHeader = context.getString(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER,
KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER);
String transactionalID = context.getString(KafkaSinkConstants.TRANSACTIONAL_ID);
if (transactionalID != null) {
try {
context.put(KafkaSinkConstants.TRANSACTIONAL_ID, InetAddress.getLocalHost().getCanonicalHostName() +
Thread.currentThread().getName() + transactionalID);
useKafkaTransactions = true;
} catch (UnknownHostException e) {
throw new ConfigurationException("Unable to configure transactional id, as cannot work out hostname", e);
}
}
if (logger.isDebugEnabled()) {
logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat);
}
kafkaFutures = new LinkedList<Future<RecordMetadata>>();
String bootStrapServers = context.getString(KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG);
if (bootStrapServers == null || bootStrapServers.isEmpty()) {
throw new ConfigurationException("Bootstrap Servers must be specified");
}
setProducerProps(context, bootStrapServers);
if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
logger.debug("Kafka producer properties: {}", kafkaProps);
}
if (counter == null) {
counter = new KafkaSinkCounter(getName());
}
}