bool PublishKafka::configureNewConnection()

in extensions/librdkafka/PublishKafka.cpp [415:562]


bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessContext> &context) {
  std::string value;
  int64_t valInt;
  std::string valueConf;
  std::array<char, 512U> errstr{};
  rd_kafka_conf_res_t result;
  const char* const PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";

  std::unique_ptr<rd_kafka_conf_t, rd_kafka_conf_deleter> conf_{ rd_kafka_conf_new() };
  if (conf_ == nullptr) {
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object");
  }

  const auto* const key = conn_->getKey();

  if (key->brokers_.empty()) {
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
  }
  result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
  logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
  if (result != RD_KAFKA_CONF_OK) {
    auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
  }

  if (key->client_id_.empty()) {
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty");
  }
  result = rd_kafka_conf_set(conf_.get(), "client.id", key->client_id_.c_str(), errstr.data(), errstr.size());
  logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
  if (result != RD_KAFKA_CONF_OK) {
    auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
  }

  value = "";
  if (context->getProperty(DebugContexts, value) && !value.empty()) {
    result = rd_kafka_conf_set(conf_.get(), "debug", value.c_str(), errstr.data(), errstr.size());
    logger_->log_debug("PublishKafka: debug [%s]", value);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }

  value = "";
  if (context->getProperty(MaxMessageSize, value) && !value.empty()) {
    result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", value.c_str(), errstr.data(), errstr.size());
    logger_->log_debug("PublishKafka: message.max.bytes [%s]", value);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }
  value = "";
  uint32_t int_val;
  if (context->getProperty(QueueBufferMaxMessage, int_val)) {
    if (int_val < batch_size_) {
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: Batch Size cannot be larger than Queue Max Message");
    }

    value = std::to_string(int_val);
    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size());
    logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }
  value = "";
  if (context->getProperty(QueueBufferMaxSize, value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
    valInt = valInt / 1024;
    valueConf = std::to_string(valInt);
    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", valueConf.c_str(), errstr.data(), errstr.size());
    logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }

  if (auto queue_buffer_max_time = context->getProperty<core::TimePeriodValue>(QueueBufferMaxTime)) {
    valueConf = std::to_string(queue_buffer_max_time->getMilliseconds().count());
    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size());
    logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }
  value = "";
  if (context->getProperty(BatchSize, value) && !value.empty()) {
    result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", value.c_str(), errstr.data(), errstr.size());
    logger_->log_debug("PublishKafka: batch.num.messages [%s]", value);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }
  value = "";
  if (context->getProperty(CompressCodec, value) && !value.empty() && value != "none") {
    result = rd_kafka_conf_set(conf_.get(), "compression.codec", value.c_str(), errstr.data(), errstr.size());
    logger_->log_debug("PublishKafka: compression.codec [%s]", value);
    if (result != RD_KAFKA_CONF_OK) {
      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
    }
  }

  setKafkaAuthenticationParameters(*context, gsl::make_not_null(conf_.get()));

  // Add all of the dynamic properties as librdkafka configurations
  const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
  logger_->log_info("PublishKafka registering %d librdkafka dynamic properties", dynamic_prop_keys.size());

  for (const auto &prop_key : dynamic_prop_keys) {
    core::Property dynamic_property_key{prop_key, "dynamic property"};
    dynamic_property_key.setSupportsExpressionLanguage(true);
    std::string dynamic_property_value;
    if (context->getDynamicProperty(dynamic_property_key, dynamic_property_value, nullptr) && !dynamic_property_value.empty()) {
      logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", prop_key, dynamic_property_value);
      result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(), dynamic_property_value.c_str(), errstr.data(), errstr.size());
      if (result != RD_KAFKA_CONF_OK) {
        auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
      }
    } else {
      logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", prop_key);
    }
  }

  // Set the delivery callback
  rd_kafka_conf_set_dr_msg_cb(conf_.get(), &messageDeliveryCallback);

  // Set the logger callback
  rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback);

  // The producer takes ownership of the configuration, we must not free it
  gsl::owner<rd_kafka_t*> producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_.release(), errstr.data(), errstr.size());
  if (producer == nullptr) {
    auto error_msg = utils::StringUtils::join_pack("Failed to create Kafka producer ", errstr.data());
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
  }

  conn_->setConnection(producer);

  return true;
}