public void configure()

in flume-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java [345:410]


  public void configure(Context context) {

    String topicStr = context.getString(KafkaSinkConstants.TOPIC_CONFIG);
    if (topicStr == null || topicStr.isEmpty()) {
      topicStr = KafkaSinkConstants.DEFAULT_TOPIC;
      logger.warn("Topic was not specified. Using {} as the topic.", topicStr);
    } else {
      logger.info("Using the static topic {}. This may be overridden by event headers", topicStr);
    }

    topic = topicStr;

    timestampHeader = context.getString(KafkaSinkConstants.TIMESTAMP_HEADER);

    headerMap = context.getSubProperties(KafkaSinkConstants.KAFKA_HEADER);

    batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE, KafkaSinkConstants.DEFAULT_BATCH_SIZE);

    if (logger.isDebugEnabled()) {
      logger.debug("Using batch size: {}", batchSize);
    }

    useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT,
                                            KafkaSinkConstants.DEFAULT_AVRO_EVENT);

    partitionHeader = context.getString(KafkaSinkConstants.PARTITION_HEADER_NAME);
    staticPartitionId = context.getInteger(KafkaSinkConstants.STATIC_PARTITION_CONF);

    allowTopicOverride = context.getBoolean(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER,
                                          KafkaSinkConstants.DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER);

    topicHeader = context.getString(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER,
                                    KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER);

    String transactionalID = context.getString(KafkaSinkConstants.TRANSACTIONAL_ID);
    if (transactionalID != null) {
      try {
        context.put(KafkaSinkConstants.TRANSACTIONAL_ID, InetAddress.getLocalHost().getCanonicalHostName() +
                Thread.currentThread().getName() + transactionalID);
        useKafkaTransactions = true;
      } catch (UnknownHostException e) {
        throw new ConfigurationException("Unable to configure transactional id, as cannot work out hostname", e);
      }
    }

    if (logger.isDebugEnabled()) {
      logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat);
    }

    kafkaFutures = new LinkedList<Future<RecordMetadata>>();

    String bootStrapServers = context.getString(KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG);
    if (bootStrapServers == null || bootStrapServers.isEmpty()) {
      throw new ConfigurationException("Bootstrap Servers must be specified");
    }

    setProducerProps(context, bootStrapServers);

    if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
      logger.debug("Kafka producer properties: {}", kafkaProps);
    }

    if (counter == null) {
      counter = new KafkaSinkCounter(getName());
    }
  }