public void configure()

in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [168:214]


  public void configure(Context ctx) {
    topicStr = ctx.getString(TOPIC_CONFIG);
    if (topicStr == null || topicStr.isEmpty()) {
      topicStr = DEFAULT_TOPIC;
      logger.info("Topic was not specified. Using {} as the topic.", topicStr);
    }
    topic.set(topicStr);

    groupId = ctx.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
    if (groupId == null || groupId.isEmpty()) {
      groupId = DEFAULT_GROUP_ID;
      logger.info("Group ID was not specified. Using {} as the group id.", groupId);
    }

    String bootStrapServers = ctx.getString(BOOTSTRAP_SERVERS_CONFIG);
    if (bootStrapServers == null || bootStrapServers.isEmpty()) {
      throw new ConfigurationException("Bootstrap Servers must be specified");
    }

    String transactionalID = ctx.getString(TRANSACTIONAL_ID);
    if (transactionalID != null) {
      try {
        ctx.put(TRANSACTIONAL_ID, InetAddress.getLocalHost().getCanonicalHostName() +
                Thread.currentThread().getName() + transactionalID);
        useKafkaTransactions = true;
      } catch (UnknownHostException e) {
        throw new ConfigurationException("Unable to configure transactional id, as cannot work out hostname", e);
      }
    }

    setProducerProps(ctx, bootStrapServers);
    setConsumerProps(ctx, bootStrapServers);

    parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
    pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT);

    staticPartitionId = ctx.getInteger(STATIC_PARTITION_CONF);
    partitionHeader = ctx.getString(PARTITION_HEADER_NAME);

    if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
      logger.debug("Kafka properties: {}", ctx);
    }

    if (counter == null) {
      counter = new KafkaChannelCounter(getName());
    }
  }