extensions/kafka/rdkafka_utils.cpp (80 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "rdkafka_utils.h" #include <array> #include "Exception.h" #include "utils/StringUtils.h" namespace org::apache::nifi::minifi::utils { void setKafkaConfigurationField(rd_kafka_conf_t& configuration, const std::string& field_name, const std::string& value) { static std::array<char, 512U> errstr{}; rd_kafka_conf_res_t result{}; result = rd_kafka_conf_set(&configuration, field_name.c_str(), value.c_str(), errstr.data(), errstr.size()); if (RD_KAFKA_CONF_OK != result) { const std::string error_msg{errstr.data()}; throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error: " + error_msg); } } void print_topics_list(core::logging::Logger& logger, const rd_kafka_topic_partition_list_t& kf_topic_partition_list) { for (int i = 0; i < kf_topic_partition_list.cnt; ++i) { logger.log_debug("kf_topic_partition_list: topic: {}, partition: {}, offset: {}.", kf_topic_partition_list.elems[i].topic, kf_topic_partition_list.elems[i].partition, kf_topic_partition_list.elems[i].offset); } } std::string get_human_readable_kafka_message_timestamp(const rd_kafka_message_t& rkmessage) { rd_kafka_timestamp_type_t tstype{}; int64_t timestamp = rd_kafka_message_timestamp(&rkmessage, &tstype); const char* tsname = "?"; if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) { tsname = "create time"; } else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) { tsname = "log append time"; } const int64_t seconds_since_timestamp = timestamp == -1 ? 0 : static_cast<int64_t>(time(nullptr)) - static_cast<int64_t>(timestamp / 1000); return {"[Timestamp](" + std::string(tsname) + " " + std::to_string(timestamp) + " (" + std::to_string(seconds_since_timestamp) + " s ago)"}; } std::string get_human_readable_kafka_message_headers(const rd_kafka_message_t& rkmessage, core::logging::Logger& logger) { rd_kafka_headers_t* hdrs = nullptr; const rd_kafka_resp_err_t get_header_response = rd_kafka_message_headers(&rkmessage, &hdrs); if (RD_KAFKA_RESP_ERR_NO_ERROR == get_header_response) { std::vector<std::string> header_list; kafka_headers_for_each(*hdrs, [&](const std::string& key, std::span<const char> val) { header_list.emplace_back(key + ": " + std::string{val.data(), val.size()}); }); return string::join(", ", header_list); } if (RD_KAFKA_RESP_ERR__NOENT == get_header_response) { return "[None]"; } logger.log_error( "Failed to fetch message headers: {}: {}", magic_enum::enum_underlying(rd_kafka_last_error()), rd_kafka_err2str(rd_kafka_last_error())); return "[Error]"; } void print_kafka_message(const rd_kafka_message_t& rkmessage, core::logging::Logger& logger) { if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage.err) { const std::string error_msg = "ConsumeKafka: received error message from broker. Librdkafka error msg: " + std::string(rd_kafka_err2str(rkmessage.err)); throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, error_msg); } std::string topicName = rd_kafka_topic_name(rkmessage.rkt); std::string message(static_cast<char*>(rkmessage.payload), rkmessage.len); const char* key = static_cast<const char*>(rkmessage.key); const std::size_t key_len = rkmessage.key_len; std::string message_as_string; message_as_string += "[Topic](" + topicName + "), "; message_as_string += "[Key](" + (key != nullptr ? std::string(key, key_len) : std::string("[None]")) + "), "; message_as_string += "[Offset](" + std::to_string(rkmessage.offset) + "), "; message_as_string += "[Message Length](" + std::to_string(rkmessage.len) + "), "; message_as_string += get_human_readable_kafka_message_timestamp(rkmessage) + "), "; message_as_string += "[Headers]("; message_as_string += get_human_readable_kafka_message_headers(rkmessage, logger) + ")"; message_as_string += "[Payload](" + message + ")"; logger.log_debug("Message: {}", message_as_string.c_str()); } std::string get_encoded_string(const std::string& input, KafkaEncoding encoding) { switch (encoding) { case KafkaEncoding::UTF8: return input; case KafkaEncoding::HEX: return string::to_hex(input, /* uppercase = */ true); } throw std::runtime_error("Invalid encoding selected: " + input); } std::optional<std::string> get_encoded_message_key(const rd_kafka_message_t& message, KafkaEncoding encoding) { if (nullptr == message.key) { return {}; } return get_encoded_string({static_cast<const char*>(message.key), message.key_len}, encoding); } } // namespace org::apache::nifi::minifi::utils