Status SetupStreamsLocked()

in tensorflow_io/core/kernels/kafka_kernels_deprecated.cc [383:506]


      Status SetupStreamsLocked(Env* env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
        if (current_topic_index_ >= dataset()->topics_.size()) {
          return errors::InvalidArgument(
              "current_topic_index_:", current_topic_index_,
              " >= topics_.size():", dataset()->topics_.size());
        }

        // Actually move on to next topic.
        string entry = dataset()->topics_[current_topic_index_];

        std::vector<string> parts = str_util::Split(entry, ":");
        if (parts.size() < 1) {
          return errors::InvalidArgument("Invalid parameters: ", entry);
        }
        string topic = parts[0];
        int32 partition = 0;
        if (parts.size() > 1) {
          if (!strings::safe_strto32(parts[1], &partition)) {
            return errors::InvalidArgument("Invalid parameters: ", entry);
          }
        }
        int64 offset = 0;
        if (parts.size() > 2) {
          if (!strings::safe_strto64(parts[2], &offset)) {
            return errors::InvalidArgument("Invalid parameters: ", entry);
          }
        }

        topic_partition_.reset(
            RdKafka::TopicPartition::create(topic, partition, offset));

        offset_ = topic_partition_->offset();
        limit_ = -1;
        if (parts.size() > 3) {
          if (!strings::safe_strto64(parts[3], &limit_)) {
            return errors::InvalidArgument("Invalid parameters: ", entry);
          }
        }

        std::unique_ptr<RdKafka::Conf> conf(
            RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
        std::unique_ptr<RdKafka::Conf> topic_conf(
            RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
        RdKafka::Conf::ConfResult result = RdKafka::Conf::CONF_UNKNOWN;

        std::string errstr;

        for (auto it = dataset()->config_topic_.begin();
             it != dataset()->config_topic_.end(); it++) {
          std::vector<string> parts = str_util::Split(*it, "=");
          if (parts.size() != 2) {
            return errors::InvalidArgument("Invalid topic configuration: ",
                                           *it);
          }
          result = topic_conf->set(parts[0], parts[1], errstr);
          if (result != RdKafka::Conf::CONF_OK) {
            return errors::Internal("Failed to do topic configuration:", *it,
                                    "error:", errstr);
          }
          LOG(INFO) << "Kafka topic configuration: " << *it;
        }

        result = conf->set("default_topic_conf", topic_conf.get(), errstr);
        if (result != RdKafka::Conf::CONF_OK) {
          return errors::Internal("Failed to set default_topic_conf:", errstr);
        }

        for (auto it = dataset()->config_global_.begin();
             it != dataset()->config_global_.end(); it++) {
          std::vector<string> parts = str_util::Split(*it, "=");
          if (parts.size() != 2) {
            return errors::InvalidArgument("Invalid global configuration: ",
                                           *it);
          }
          result = conf->set(parts[0], parts[1], errstr);
          if (result != RdKafka::Conf::CONF_OK) {
            return errors::Internal("Failed to do global configuration: ", *it,
                                    "error:", errstr);
          }
          LOG(INFO) << "Kafka global configuration: " << *it;
        }

        result = conf->set("event_cb", &kafka_event_cb, errstr);
        if (result != RdKafka::Conf::CONF_OK) {
          return errors::Internal("Failed to set event_cb:", errstr);
        }

        result = conf->set("bootstrap.servers", dataset()->servers_, errstr);
        if (result != RdKafka::Conf::CONF_OK) {
          return errors::Internal("Failed to set bootstrap.servers ",
                                  dataset()->servers_, ":", errstr);
        }
        result = conf->set("group.id", dataset()->group_, errstr);
        if (result != RdKafka::Conf::CONF_OK) {
          return errors::Internal("Failed to set group.id ", dataset()->group_,
                                  ":", errstr);
        }

        // Always enable.partition.eof=true
        result = conf->set("enable.partition.eof", "true", errstr);
        if (result != RdKafka::Conf::CONF_OK) {
          return errors::Internal("Failed to set enable.partition.eof=true",
                                  ":", errstr);
        }

        consumer_.reset(RdKafka::KafkaConsumer::create(conf.get(), errstr));
        if (!consumer_.get()) {
          return errors::Internal("Failed to create consumer:", errstr);
        }

        std::vector<RdKafka::TopicPartition*> partitions;
        partitions.emplace_back(topic_partition_.get());
        RdKafka::ErrorCode err = consumer_->assign(partitions);
        if (err != RdKafka::ERR_NO_ERROR) {
          return errors::Internal(
              "Failed to assign partition [", topic_partition_->topic(), ", ",
              topic_partition_->partition(), ", ", topic_partition_->offset(),
              "]:", RdKafka::err2str(err));
        }
        LOG(INFO) << "Kafka stream starts with current offset: "
                  << topic_partition_->offset();

        return Status::OK();
      }