virtual Status Init()

in tensorflow_io/core/kernels/kafka_kernels.cc [829:954]


  virtual Status Init(const std::vector<std::string>& topics,
                      const std::vector<std::string>& metadata) {
    mutex_lock l(mu_);

    std::unique_ptr<RdKafka::Conf> conf(
        RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
    std::unique_ptr<RdKafka::Conf> conf_topic(
        RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));

    string errstr;
    RdKafka::Conf::ConfResult result = RdKafka::Conf::CONF_UNKNOWN;

    // The default kafka topic configurations are set first before
    // setting the global confs
    for (size_t i = 0; i < metadata.size(); i++) {
      if (metadata[i].find("conf.topic.") == 0) {
        std::vector<string> parts = str_util::Split(metadata[i], "=");
        if (parts.size() != 2) {
          return errors::InvalidArgument("invalid topic configuration: ",
                                         metadata[i]);
        }
        result = conf_topic->set(parts[0].substr(11), parts[1], errstr);
        if (result != RdKafka::Conf::CONF_OK) {
          return errors::Internal("failed to do topic configuration:",
                                  metadata[i], "error:", errstr);
        }
        LOG(INFO) << "Kafka configuration: " << metadata[i];
      }
    }
    if ((result = conf->set("default_topic_conf", conf_topic.get(), errstr)) !=
        RdKafka::Conf::CONF_OK) {
      return errors::Internal("failed to set default_topic_conf:", errstr);
    }

    // Once the `default_topic_conf` is set, the global confs can now be set
    // without any risk of being overwritten.
    // Setting the global confs before setting the `default_topic_conf`
    // results in erratic behaviour.
    for (size_t i = 0; i < metadata.size(); i++) {
      if (metadata[i] != "" && metadata[i].find("conf.") == string::npos) {
        std::vector<string> parts = str_util::Split(metadata[i], "=");
        if (parts.size() != 2) {
          return errors::InvalidArgument("invalid topic configuration: ",
                                         metadata[i]);
        }
        if ((result = conf->set(parts[0], parts[1], errstr)) !=
            RdKafka::Conf::CONF_OK) {
          return errors::Internal("failed to do global configuration: ",
                                  metadata[i], "error:", errstr);
        }
        LOG(INFO) << "Kafka configuration: " << metadata[i];
      }
    }

    // default consumer.properties:
    //   bootstrap.servers=localhost:9092
    //   group.id=test-consumer-group

    string bootstrap_servers;
    if ((result = conf->get("bootstrap.servers", bootstrap_servers)) !=
        RdKafka::Conf::CONF_OK) {
      bootstrap_servers = "localhost:9092";
      if ((result = conf->set("bootstrap.servers", bootstrap_servers,
                              errstr)) != RdKafka::Conf::CONF_OK) {
        return errors::Internal("failed to set bootstrap.servers [",
                                bootstrap_servers, "]:", errstr);
      }
    }
    string group_id;
    if ((result = conf->get("group.id", group_id)) != RdKafka::Conf::CONF_OK) {
      group_id = "test-consumer-group";
      if ((result = conf->set("group.id", group_id, errstr)) !=
          RdKafka::Conf::CONF_OK) {
        return errors::Internal("failed to set group.id [", group_id,
                                "]:", errstr);
      }
    }

    // set max.poll.records configuration
    std::string batch_num_messages;
    if ((result = conf->get("batch.num.messages", batch_num_messages)) !=
        RdKafka::Conf::CONF_OK) {
      batch_num_messages = "1024";
      if ((result = conf->set("batch.num.messages", batch_num_messages,
                              errstr)) != RdKafka::Conf::CONF_OK) {
        return errors::Internal("failed to set batch.num.messages [",
                                batch_num_messages, "]:", errstr);
      }
    }
    sscanf(batch_num_messages.c_str(), "%d", &batch_num_messages_);
    LOG(INFO) << "max num of messages per batch: " << batch_num_messages_;

    // Always set enable.partition.eof=true
    if ((result = conf->set("enable.partition.eof", "true", errstr)) !=
        RdKafka::Conf::CONF_OK) {
      return errors::Internal("Failed to set enable.partition.eof=true :",
                              errstr);
    }

    if ((result = conf->set("event_cb", &kafka_event_cb_, errstr)) !=
        RdKafka::Conf::CONF_OK) {
      return errors::Internal("failed to set event_cb:", errstr);
    }

    if ((result = conf->set("rebalance_cb", &kafka_rebalance_cb_, errstr)) !=
        RdKafka::Conf::CONF_OK) {
      return errors::Internal("failed to set rebalance_cb:", errstr);
    }

    LOG(INFO) << "Creating the kafka consumer";
    consumer_.reset(RdKafka::KafkaConsumer::create(conf.get(), errstr));
    if (!consumer_.get()) {
      return errors::Internal("failed to create consumer:", errstr);
    }

    for (int i = 0; i < topics.size(); i++) {
      LOG(INFO) << "Subscribing to the kafka topic: " << topics[i];
    }
    RdKafka::ErrorCode err = consumer_->subscribe(topics);
    if (err != RdKafka::ERR_NO_ERROR) {
      return errors::Internal("failed to subscribe to topics: ",
                              RdKafka::err2str(err));
    }

    return Status::OK();
  }