in extensions/kafka/PublishKafka.cpp [389:526]
bool PublishKafka::configureNewConnection(core::ProcessContext& context) {
std::array<char, 512U> err_chars{};
rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK;
constexpr std::string_view PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
utils::rd_kafka_conf_unique_ptr conf_{rd_kafka_conf_new()};
if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); }
const auto* const key = conn_->getKey();
if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers"); }
result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", key->brokers_.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: bootstrap.servers [{}]", key->brokers_);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if (key->client_id_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty"); }
result = rd_kafka_conf_set(conf_.get(), "client.id", key->client_id_.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: client.id [{}]", key->client_id_);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if (const auto debug_context = context.getProperty(DebugContexts)) {
result = rd_kafka_conf_set(conf_.get(), "debug", debug_context->c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: debug [{}]", *debug_context);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
if (const auto max_message_size = context.getProperty(MaxMessageSize); max_message_size && !max_message_size->empty()) {
result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", max_message_size->c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: message.max.bytes [{}]", *max_message_size);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
if (const auto queue_buffer_max_message = utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) {
if (*queue_buffer_max_message < batch_size_) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: Batch Size cannot be larger than Queue Max Message");
}
const auto value = std::to_string(*queue_buffer_max_message);
result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", value.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: queue.buffering.max.messages [{}]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
if (const auto queue_buffer_max_size = utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) {
auto valInt = *queue_buffer_max_size / 1024;
auto valueConf = std::to_string(valInt);
result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", valueConf.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [{}]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
if (const auto queue_buffer_max_time = utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) {
auto valueConf = std::to_string(queue_buffer_max_time->count());
result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", valueConf.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: queue.buffering.max.ms [{}]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
if (const auto batch_size = utils::parseOptionalU64Property(context, BatchSize)) {
auto value = std::to_string(*batch_size);
result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", value.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: batch.num.messages [{}]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
if (const auto compress_codec = context.getProperty(CompressCodec); compress_codec && !compress_codec->empty() && *compress_codec != "none") {
result = rd_kafka_conf_set(conf_.get(), "compression.codec", compress_codec->c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: compression.codec [{}]", *compress_codec);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
// Add all the dynamic properties as librdkafka configurations
const auto& dynamic_prop_keys = context.getDynamicPropertyKeys();
logger_->log_info("PublishKafka registering {} librdkafka dynamic properties", dynamic_prop_keys.size());
for (const auto& prop_key : dynamic_prop_keys) {
if (const auto dynamic_property_value = context.getDynamicProperty(prop_key, nullptr); dynamic_property_value && !dynamic_property_value->empty()) {
logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]", prop_key, *dynamic_property_value);
result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(), dynamic_property_value->c_str(), err_chars.data(), err_chars.size());
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
} else {
logger_->log_warn(
"PublishKafka Dynamic Property '{}' is empty and therefore will not "
"be configured",
prop_key);
}
}
// Set the delivery callback
rd_kafka_conf_set_dr_msg_cb(conf_.get(), &messageDeliveryCallback);
// Set the logger callback
rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback);
// The producer takes ownership of the configuration, we must not free it
utils::rd_kafka_producer_unique_ptr producer{
rd_kafka_new(RD_KAFKA_PRODUCER, conf_.release(), err_chars.data(), err_chars.size())};
if (producer == nullptr) {
auto error_msg = utils::string::join_pack("Failed to create Kafka producer ", err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
conn_->setConnection(std::move(producer));
return true;
}