in extensions/librdkafka/PublishKafka.cpp [415:562]
bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessContext> &context) {
std::string value;
int64_t valInt;
std::string valueConf;
std::array<char, 512U> errstr{};
rd_kafka_conf_res_t result;
const char* const PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
std::unique_ptr<rd_kafka_conf_t, rd_kafka_conf_deleter> conf_{ rd_kafka_conf_new() };
if (conf_ == nullptr) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object");
}
const auto* const key = conn_->getKey();
if (key->brokers_.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
}
result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if (key->client_id_.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty");
}
result = rd_kafka_conf_set(conf_.get(), "client.id", key->client_id_.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
value = "";
if (context->getProperty(DebugContexts, value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "debug", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: debug [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(MaxMessageSize, value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: message.max.bytes [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
uint32_t int_val;
if (context->getProperty(QueueBufferMaxMessage, int_val)) {
if (int_val < batch_size_) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: Batch Size cannot be larger than Queue Max Message");
}
value = std::to_string(int_val);
result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(QueueBufferMaxSize, value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
valInt = valInt / 1024;
valueConf = std::to_string(valInt);
result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
if (auto queue_buffer_max_time = context->getProperty<core::TimePeriodValue>(QueueBufferMaxTime)) {
valueConf = std::to_string(queue_buffer_max_time->getMilliseconds().count());
result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(BatchSize, value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: batch.num.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(CompressCodec, value) && !value.empty() && value != "none") {
result = rd_kafka_conf_set(conf_.get(), "compression.codec", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: compression.codec [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
setKafkaAuthenticationParameters(*context, gsl::make_not_null(conf_.get()));
// Add all of the dynamic properties as librdkafka configurations
const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
logger_->log_info("PublishKafka registering %d librdkafka dynamic properties", dynamic_prop_keys.size());
for (const auto &prop_key : dynamic_prop_keys) {
core::Property dynamic_property_key{prop_key, "dynamic property"};
dynamic_property_key.setSupportsExpressionLanguage(true);
std::string dynamic_property_value;
if (context->getDynamicProperty(dynamic_property_key, dynamic_property_value, nullptr) && !dynamic_property_value.empty()) {
logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", prop_key, dynamic_property_value);
result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(), dynamic_property_value.c_str(), errstr.data(), errstr.size());
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
} else {
logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", prop_key);
}
}
// Set the delivery callback
rd_kafka_conf_set_dr_msg_cb(conf_.get(), &messageDeliveryCallback);
// Set the logger callback
rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback);
// The producer takes ownership of the configuration, we must not free it
gsl::owner<rd_kafka_t*> producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_.release(), errstr.data(), errstr.size());
if (producer == nullptr) {
auto error_msg = utils::StringUtils::join_pack("Failed to create Kafka producer ", errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
conn_->setConnection(producer);
return true;
}