in tensorflow_io/core/kernels/kafka_kernels.cc [829:954]
virtual Status Init(const std::vector<std::string>& topics,
const std::vector<std::string>& metadata) {
mutex_lock l(mu_);
std::unique_ptr<RdKafka::Conf> conf(
RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
std::unique_ptr<RdKafka::Conf> conf_topic(
RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
string errstr;
RdKafka::Conf::ConfResult result = RdKafka::Conf::CONF_UNKNOWN;
// The default kafka topic configurations are set first before
// setting the global confs
for (size_t i = 0; i < metadata.size(); i++) {
if (metadata[i].find("conf.topic.") == 0) {
std::vector<string> parts = str_util::Split(metadata[i], "=");
if (parts.size() != 2) {
return errors::InvalidArgument("invalid topic configuration: ",
metadata[i]);
}
result = conf_topic->set(parts[0].substr(11), parts[1], errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("failed to do topic configuration:",
metadata[i], "error:", errstr);
}
LOG(INFO) << "Kafka configuration: " << metadata[i];
}
}
if ((result = conf->set("default_topic_conf", conf_topic.get(), errstr)) !=
RdKafka::Conf::CONF_OK) {
return errors::Internal("failed to set default_topic_conf:", errstr);
}
// Once the `default_topic_conf` is set, the global confs can now be set
// without any risk of being overwritten.
// Setting the global confs before setting the `default_topic_conf`
// results in erratic behaviour.
for (size_t i = 0; i < metadata.size(); i++) {
if (metadata[i] != "" && metadata[i].find("conf.") == string::npos) {
std::vector<string> parts = str_util::Split(metadata[i], "=");
if (parts.size() != 2) {
return errors::InvalidArgument("invalid topic configuration: ",
metadata[i]);
}
if ((result = conf->set(parts[0], parts[1], errstr)) !=
RdKafka::Conf::CONF_OK) {
return errors::Internal("failed to do global configuration: ",
metadata[i], "error:", errstr);
}
LOG(INFO) << "Kafka configuration: " << metadata[i];
}
}
// default consumer.properties:
// bootstrap.servers=localhost:9092
// group.id=test-consumer-group
string bootstrap_servers;
if ((result = conf->get("bootstrap.servers", bootstrap_servers)) !=
RdKafka::Conf::CONF_OK) {
bootstrap_servers = "localhost:9092";
if ((result = conf->set("bootstrap.servers", bootstrap_servers,
errstr)) != RdKafka::Conf::CONF_OK) {
return errors::Internal("failed to set bootstrap.servers [",
bootstrap_servers, "]:", errstr);
}
}
string group_id;
if ((result = conf->get("group.id", group_id)) != RdKafka::Conf::CONF_OK) {
group_id = "test-consumer-group";
if ((result = conf->set("group.id", group_id, errstr)) !=
RdKafka::Conf::CONF_OK) {
return errors::Internal("failed to set group.id [", group_id,
"]:", errstr);
}
}
// set max.poll.records configuration
std::string batch_num_messages;
if ((result = conf->get("batch.num.messages", batch_num_messages)) !=
RdKafka::Conf::CONF_OK) {
batch_num_messages = "1024";
if ((result = conf->set("batch.num.messages", batch_num_messages,
errstr)) != RdKafka::Conf::CONF_OK) {
return errors::Internal("failed to set batch.num.messages [",
batch_num_messages, "]:", errstr);
}
}
sscanf(batch_num_messages.c_str(), "%d", &batch_num_messages_);
LOG(INFO) << "max num of messages per batch: " << batch_num_messages_;
// Always set enable.partition.eof=true
if ((result = conf->set("enable.partition.eof", "true", errstr)) !=
RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to set enable.partition.eof=true :",
errstr);
}
if ((result = conf->set("event_cb", &kafka_event_cb_, errstr)) !=
RdKafka::Conf::CONF_OK) {
return errors::Internal("failed to set event_cb:", errstr);
}
if ((result = conf->set("rebalance_cb", &kafka_rebalance_cb_, errstr)) !=
RdKafka::Conf::CONF_OK) {
return errors::Internal("failed to set rebalance_cb:", errstr);
}
LOG(INFO) << "Creating the kafka consumer";
consumer_.reset(RdKafka::KafkaConsumer::create(conf.get(), errstr));
if (!consumer_.get()) {
return errors::Internal("failed to create consumer:", errstr);
}
for (int i = 0; i < topics.size(); i++) {
LOG(INFO) << "Subscribing to the kafka topic: " << topics[i];
}
RdKafka::ErrorCode err = consumer_->subscribe(topics);
if (err != RdKafka::ERR_NO_ERROR) {
return errors::Internal("failed to subscribe to topics: ",
RdKafka::err2str(err));
}
return Status::OK();
}