in tensorflow_io/core/kernels/kafka_kernels_deprecated.cc [383:506]
Status SetupStreamsLocked(Env* env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (current_topic_index_ >= dataset()->topics_.size()) {
return errors::InvalidArgument(
"current_topic_index_:", current_topic_index_,
" >= topics_.size():", dataset()->topics_.size());
}
// Actually move on to next topic.
string entry = dataset()->topics_[current_topic_index_];
std::vector<string> parts = str_util::Split(entry, ":");
if (parts.size() < 1) {
return errors::InvalidArgument("Invalid parameters: ", entry);
}
string topic = parts[0];
int32 partition = 0;
if (parts.size() > 1) {
if (!strings::safe_strto32(parts[1], &partition)) {
return errors::InvalidArgument("Invalid parameters: ", entry);
}
}
int64 offset = 0;
if (parts.size() > 2) {
if (!strings::safe_strto64(parts[2], &offset)) {
return errors::InvalidArgument("Invalid parameters: ", entry);
}
}
topic_partition_.reset(
RdKafka::TopicPartition::create(topic, partition, offset));
offset_ = topic_partition_->offset();
limit_ = -1;
if (parts.size() > 3) {
if (!strings::safe_strto64(parts[3], &limit_)) {
return errors::InvalidArgument("Invalid parameters: ", entry);
}
}
std::unique_ptr<RdKafka::Conf> conf(
RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
std::unique_ptr<RdKafka::Conf> topic_conf(
RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
RdKafka::Conf::ConfResult result = RdKafka::Conf::CONF_UNKNOWN;
std::string errstr;
for (auto it = dataset()->config_topic_.begin();
it != dataset()->config_topic_.end(); it++) {
std::vector<string> parts = str_util::Split(*it, "=");
if (parts.size() != 2) {
return errors::InvalidArgument("Invalid topic configuration: ",
*it);
}
result = topic_conf->set(parts[0], parts[1], errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to do topic configuration:", *it,
"error:", errstr);
}
LOG(INFO) << "Kafka topic configuration: " << *it;
}
result = conf->set("default_topic_conf", topic_conf.get(), errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to set default_topic_conf:", errstr);
}
for (auto it = dataset()->config_global_.begin();
it != dataset()->config_global_.end(); it++) {
std::vector<string> parts = str_util::Split(*it, "=");
if (parts.size() != 2) {
return errors::InvalidArgument("Invalid global configuration: ",
*it);
}
result = conf->set(parts[0], parts[1], errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to do global configuration: ", *it,
"error:", errstr);
}
LOG(INFO) << "Kafka global configuration: " << *it;
}
result = conf->set("event_cb", &kafka_event_cb, errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to set event_cb:", errstr);
}
result = conf->set("bootstrap.servers", dataset()->servers_, errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to set bootstrap.servers ",
dataset()->servers_, ":", errstr);
}
result = conf->set("group.id", dataset()->group_, errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to set group.id ", dataset()->group_,
":", errstr);
}
// Always enable.partition.eof=true
result = conf->set("enable.partition.eof", "true", errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to set enable.partition.eof=true",
":", errstr);
}
consumer_.reset(RdKafka::KafkaConsumer::create(conf.get(), errstr));
if (!consumer_.get()) {
return errors::Internal("Failed to create consumer:", errstr);
}
std::vector<RdKafka::TopicPartition*> partitions;
partitions.emplace_back(topic_partition_.get());
RdKafka::ErrorCode err = consumer_->assign(partitions);
if (err != RdKafka::ERR_NO_ERROR) {
return errors::Internal(
"Failed to assign partition [", topic_partition_->topic(), ", ",
topic_partition_->partition(), ", ", topic_partition_->offset(),
"]:", RdKafka::err2str(err));
}
LOG(INFO) << "Kafka stream starts with current offset: "
<< topic_partition_->offset();
return Status::OK();
}