bool PublishKafka::configureNewConnection()

in extensions/kafka/PublishKafka.cpp [389:526]


bool PublishKafka::configureNewConnection(core::ProcessContext& context) {
  std::array<char, 512U> err_chars{};
  rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK;
  constexpr std::string_view PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";

  utils::rd_kafka_conf_unique_ptr conf_{rd_kafka_conf_new()};
  if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); }

  const auto* const key = conn_->getKey();

  if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers"); }
  result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", key->brokers_.c_str(), err_chars.data(), err_chars.size());
  logger_->log_debug("PublishKafka: bootstrap.servers [{}]", key->brokers_);
  if (result != RD_KAFKA_CONF_OK) {
    auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
  }

  if (key->client_id_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty"); }
  result = rd_kafka_conf_set(conf_.get(), "client.id", key->client_id_.c_str(), err_chars.data(), err_chars.size());
  logger_->log_debug("PublishKafka: client.id [{}]", key->client_id_);
  if (result != RD_KAFKA_CONF_OK) {
    auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
  }

  if (const auto debug_context = context.getProperty(DebugContexts)) {
    result = rd_kafka_conf_set(conf_.get(), "debug", debug_context->c_str(), err_chars.data(), err_chars.size());
    logger_->log_debug("PublishKafka: debug [{}]", *debug_context);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }

  if (const auto max_message_size = context.getProperty(MaxMessageSize); max_message_size && !max_message_size->empty()) {
    result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", max_message_size->c_str(), err_chars.data(), err_chars.size());
    logger_->log_debug("PublishKafka: message.max.bytes [{}]", *max_message_size);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }

  if (const auto queue_buffer_max_message = utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) {
    if (*queue_buffer_max_message < batch_size_) {
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: Batch Size cannot be larger than Queue Max Message");
    }

    const auto value = std::to_string(*queue_buffer_max_message);
    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", value.c_str(), err_chars.data(), err_chars.size());
    logger_->log_debug("PublishKafka: queue.buffering.max.messages [{}]", value);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }

  if (const auto queue_buffer_max_size = utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) {
    auto valInt = *queue_buffer_max_size / 1024;
    auto valueConf = std::to_string(valInt);
    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", valueConf.c_str(), err_chars.data(), err_chars.size());
    logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [{}]", valueConf);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }

  if (const auto queue_buffer_max_time = utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) {
    auto valueConf = std::to_string(queue_buffer_max_time->count());
    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", valueConf.c_str(), err_chars.data(), err_chars.size());
    logger_->log_debug("PublishKafka: queue.buffering.max.ms [{}]", valueConf);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }

  if (const auto batch_size = utils::parseOptionalU64Property(context, BatchSize)) {
    auto value = std::to_string(*batch_size);
    result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", value.c_str(), err_chars.data(), err_chars.size());
    logger_->log_debug("PublishKafka: batch.num.messages [{}]", value);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }

  if (const auto compress_codec = context.getProperty(CompressCodec); compress_codec && !compress_codec->empty() && *compress_codec != "none") {
    result = rd_kafka_conf_set(conf_.get(), "compression.codec", compress_codec->c_str(), err_chars.data(), err_chars.size());
    logger_->log_debug("PublishKafka: compression.codec [{}]", *compress_codec);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }

  setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));

  // Add all the dynamic properties as librdkafka configurations
  const auto& dynamic_prop_keys = context.getDynamicPropertyKeys();
  logger_->log_info("PublishKafka registering {} librdkafka dynamic properties", dynamic_prop_keys.size());

  for (const auto& prop_key : dynamic_prop_keys) {
    if (const auto dynamic_property_value = context.getDynamicProperty(prop_key, nullptr); dynamic_property_value && !dynamic_property_value->empty()) {
      logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]", prop_key, *dynamic_property_value);
      result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(), dynamic_property_value->c_str(), err_chars.data(), err_chars.size());
      if (result != RD_KAFKA_CONF_OK) {
        auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data());
        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
      }
    } else {
      logger_->log_warn(
          "PublishKafka Dynamic Property '{}' is empty and therefore will not "
          "be configured",
          prop_key);
    }
  }

  // Set the delivery callback
  rd_kafka_conf_set_dr_msg_cb(conf_.get(), &messageDeliveryCallback);

  // Set the logger callback
  rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback);

  // The producer takes ownership of the configuration, we must not free it
  utils::rd_kafka_producer_unique_ptr producer{
      rd_kafka_new(RD_KAFKA_PRODUCER, conf_.release(), err_chars.data(), err_chars.size())};
  if (producer == nullptr) {
    auto error_msg = utils::string::join_pack("Failed to create Kafka producer ", err_chars.data());
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
  }

  conn_->setConnection(std::move(producer));

  return true;
}