extensions/kafka/ConsumeKafka.cpp (272 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "ConsumeKafka.h" #include <algorithm> #include "core/FlowFile.h" #include "core/ProcessSession.h" #include "minifi-cpp/core/PropertyValidator.h" #include "core/Resource.h" #include "utils/ProcessorConfigUtils.h" #include "utils/gsl.h" using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi { namespace core { // The upper limit for Max Poll Time is 4 seconds. This is because Watchdog would potentially start // reporting issues with the processor health otherwise bool ConsumeKafkaMaxPollTimePropertyValidator::validate(const std::string_view input) const { const auto parsed_time = parsing::parseDurationMinMax<std::chrono::nanoseconds>(input, 0ms, 4s); return parsed_time.has_value(); } } // namespace core namespace processors { void ConsumeKafka::initialize() { setSupportedProperties(Properties); setSupportedRelationships(Relationships); } void ConsumeKafka::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { // Required properties kafka_brokers_ = utils::parseProperty(context, KafkaBrokers); topic_names_ = utils::string::splitAndTrim(utils::parseProperty(context, TopicNames), ","); topic_name_format_ = utils::parseProperty(context, TopicNameFormat); honor_transactions_ = utils::parseBoolProperty(context, HonorTransactions); group_id_ = utils::parseProperty(context, GroupID); offset_reset_ = utils::parseProperty(context, OffsetReset); key_attribute_encoding_ = utils::parseProperty(context, KeyAttributeEncoding); max_poll_time_milliseconds_ = utils::parseDurationProperty(context, MaxPollTime); session_timeout_milliseconds_ = utils::parseDurationProperty(context, SessionTimeout); // Optional properties message_demarcator_ = context.getProperty(MessageDemarcator).value_or(""); message_header_encoding_ = context.getProperty(MessageHeaderEncoding).value_or(""); duplicate_header_handling_ = context.getProperty(DuplicateHeaderHandling).value_or(""); headers_to_add_as_attributes_ = utils::string::splitAndTrim(utils::parseOptionalProperty(context, HeadersToAddAsAttributes).value_or(""), ","); max_poll_records_ = gsl::narrow<std::size_t>(utils::parseU64Property(context, MaxPollRecords)); if (!utils::string::equalsIgnoreCase(KEY_ATTR_ENCODING_UTF_8, key_attribute_encoding_) && !utils::string::equalsIgnoreCase(KEY_ATTR_ENCODING_HEX, key_attribute_encoding_)) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported key attribute encoding: " + key_attribute_encoding_); } if (!utils::string::equalsIgnoreCase(MSG_HEADER_ENCODING_UTF_8, message_header_encoding_) && !utils::string::equalsIgnoreCase(MSG_HEADER_ENCODING_HEX, message_header_encoding_)) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported message header encoding: " + key_attribute_encoding_); } configure_new_connection(context); } namespace { void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) { // Cooperative, incremental assignment is not supported in the current librdkafka version std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ConsumeKafka>::getLogger()}; logger->log_debug("Rebalance triggered."); rd_kafka_resp_err_t assign_error = RD_KAFKA_RESP_ERR_NO_ERROR; switch (trigger) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: logger->log_debug("assigned:"); if (logger->should_log(core::logging::LOG_LEVEL::debug)) { utils::print_topics_list(*logger, *partitions); } assign_error = rd_kafka_assign(rk, partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: logger->log_debug("revoked:"); rd_kafka_commit(rk, partitions, /* async = */ 0); // Sync commit, maybe unnecessary if (logger->should_log(core::logging::LOG_LEVEL::debug)) { utils::print_topics_list(*logger, *partitions); } assign_error = rd_kafka_assign(rk, nullptr); break; default: logger->log_debug("failed: {}", rd_kafka_err2str(trigger)); assign_error = rd_kafka_assign(rk, nullptr); break; } logger->log_debug("assign failure: {}", rd_kafka_err2str(assign_error)); } } // namespace void ConsumeKafka::create_topic_partition_list() { kf_topic_partition_list_ = utils::rd_kafka_topic_partition_list_unique_ptr{ rd_kafka_topic_partition_list_new(gsl::narrow<int>(topic_names_.size()))}; // On subscriptions any topics prefixed with ^ will be regex matched if (utils::string::equalsIgnoreCase(TOPIC_FORMAT_PATTERNS, topic_name_format_)) { for (const std::string& topic: topic_names_) { const std::string regex_format = "^" + topic; rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(), regex_format.c_str(), RD_KAFKA_PARTITION_UA); } } else { for (const std::string& topic: topic_names_) { rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(), topic.c_str(), RD_KAFKA_PARTITION_UA); } } // Subscribe to topic set using balanced consumer groups // Subscribing from the same process without an inbetween unsubscribe call // Does not seem to be triggering a rebalance (maybe librdkafka bug?) // This might happen until the cross-overship between processors and connections are settled rd_kafka_resp_err_t subscribe_response = rd_kafka_subscribe(consumer_.get(), kf_topic_partition_list_.get()); if (RD_KAFKA_RESP_ERR_NO_ERROR != subscribe_response) { logger_->log_error("rd_kafka_subscribe error {}: {}", magic_enum::enum_underlying(subscribe_response), rd_kafka_err2str(subscribe_response)); } } void ConsumeKafka::extend_config_from_dynamic_properties(const core::ProcessContext& context) { using utils::setKafkaConfigurationField; const std::vector<std::string> dynamic_prop_keys = context.getDynamicPropertyKeys(); if (dynamic_prop_keys.empty()) { return; } logger_->log_info("Loading {} extra kafka configuration fields from ConsumeKafka dynamic properties:", dynamic_prop_keys.size()); for (const std::string& key : dynamic_prop_keys) { std::string value = context.getDynamicProperty(key) | utils::orThrow(fmt::format("This shouldn't happen, dynamic property {} is expected because we just queried the list of dynamic properties", key)); logger_->log_info("{}: {}", key.c_str(), value.c_str()); setKafkaConfigurationField(*conf_, key, value); } } void ConsumeKafka::configure_new_connection(core::ProcessContext& context) { using utils::setKafkaConfigurationField; conf_ = {rd_kafka_conf_new(), utils::rd_kafka_conf_deleter()}; if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); } // Set rebalance callback for use with coordinated consumer group balancing // Rebalance handlers are needed for the initial configuration of the consumer // If they are not set, offset reset is ignored and polling produces messages // Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that // responsibility to the application's rebalance_cb. rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb); // Uncomment this for librdkafka debug logs: // logger_->log_info("Enabling all debug logs for kafka consumer."); // setKafkaConfigurationField(*conf_, "debug", "all"); setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get())); setKafkaConfigurationField(*conf_, "bootstrap.servers", kafka_brokers_); setKafkaConfigurationField(*conf_, "allow.auto.create.topics", "true"); setKafkaConfigurationField(*conf_, "auto.offset.reset", offset_reset_); setKafkaConfigurationField(*conf_, "enable.auto.commit", "false"); setKafkaConfigurationField(*conf_, "enable.auto.offset.store", "false"); setKafkaConfigurationField(*conf_, "isolation.level", honor_transactions_ ? "read_committed" : "read_uncommitted"); setKafkaConfigurationField(*conf_, "group.id", group_id_); setKafkaConfigurationField(*conf_, "session.timeout.ms", std::to_string(session_timeout_milliseconds_.count())); // Twice the default, arbitrarily chosen setKafkaConfigurationField(*conf_, "max.poll.interval.ms", "600000"); // This is a librdkafka option, but the communication timeout is also specified in each of the // relevant API calls. Could be redundant, but it probably does not hurt to set this setKafkaConfigurationField(*conf_, "metadata.request.timeout.ms", std::to_string(METADATA_COMMUNICATIONS_TIMEOUT_MS)); extend_config_from_dynamic_properties(context); std::array<char, 512U> errstr{}; consumer_ = {rd_kafka_new(RD_KAFKA_CONSUMER, conf_.release(), errstr.data(), errstr.size()), utils::rd_kafka_consumer_deleter()}; if (consumer_ == nullptr) { const std::string error_msg{errstr.data()}; throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create Kafka consumer " + error_msg); } create_topic_partition_list(); // Changing the partition list should happen only as part as the initialization of offsets // a function like `rd_kafka_position()` might have unexpected effects // for instance when a consumer gets assigned a partition it used to // consume at an earlier rebalance. // // As far as I understand, instead of rd_kafka_position() an rd_kafka_committed() call if preferred here, // as it properly fetches offsets from the broker if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_committed(consumer_.get(), kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS)) { logger_->log_error("Retrieving committed offsets for topics+partitions failed."); } rd_kafka_resp_err_t poll_set_consumer_response = rd_kafka_poll_set_consumer(consumer_.get()); if (RD_KAFKA_RESP_ERR_NO_ERROR != poll_set_consumer_response) { logger_->log_error("rd_kafka_poll_set_consumer error {}: {}", magic_enum::enum_underlying(poll_set_consumer_response), rd_kafka_err2str(poll_set_consumer_response)); } } std::string ConsumeKafka::extract_message(const rd_kafka_message_t& rkmessage) { if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage.err) { throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "ConsumeKafka: received error message from broker: " + std::to_string(rkmessage.err) + " " + rd_kafka_err2str(rkmessage.err)); } return {static_cast<char*>(rkmessage.payload), rkmessage.len}; } std::vector<utils::rd_kafka_message_unique_ptr> ConsumeKafka::poll_kafka_messages() { std::vector<utils::rd_kafka_message_unique_ptr> messages; messages.reserve(max_poll_records_); const auto start = std::chrono::steady_clock::now(); auto elapsed = std::chrono::steady_clock::now() - start; while (messages.size() < max_poll_records_ && elapsed < max_poll_time_milliseconds_) { logger_->log_debug("Polling for new messages for {}...", max_poll_time_milliseconds_); const auto timeout_ms = gsl::narrow<int>(std::chrono::duration_cast<std::chrono::milliseconds>(max_poll_time_milliseconds_ - elapsed).count()); utils::rd_kafka_message_unique_ptr message{rd_kafka_consumer_poll(consumer_.get(), timeout_ms)}; if (!message) { break; } if (RD_KAFKA_RESP_ERR_NO_ERROR != message->err) { logger_->log_error("Received message with error {}: {}", magic_enum::enum_underlying(message->err), rd_kafka_err2str(message->err)); break; } utils::print_kafka_message(*message, *logger_); messages.emplace_back(std::move(message)); elapsed = std::chrono::steady_clock::now() - start; } return messages; } utils::KafkaEncoding ConsumeKafka::key_attr_encoding_attr_to_enum() const { if (utils::string::equalsIgnoreCase(key_attribute_encoding_, KEY_ATTR_ENCODING_UTF_8)) { return utils::KafkaEncoding::UTF8; } if (utils::string::equalsIgnoreCase(key_attribute_encoding_, KEY_ATTR_ENCODING_HEX)) { return utils::KafkaEncoding::HEX; } throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Key Attribute Encoding\" property not recognized."); } utils::KafkaEncoding ConsumeKafka::message_header_encoding_attr_to_enum() const { if (utils::string::equalsIgnoreCase(message_header_encoding_, MSG_HEADER_ENCODING_UTF_8)) { return utils::KafkaEncoding::UTF8; } if (utils::string::equalsIgnoreCase(message_header_encoding_, MSG_HEADER_ENCODING_HEX)) { return utils::KafkaEncoding::HEX; } throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Message Header Encoding\" property not recognized."); } std::string ConsumeKafka::resolve_duplicate_headers(const std::vector<std::string>& matching_headers) const { if (MSG_HEADER_KEEP_FIRST == duplicate_header_handling_) { return matching_headers.front(); } if (MSG_HEADER_KEEP_LATEST == duplicate_header_handling_) { return matching_headers.back(); } if (MSG_HEADER_COMMA_SEPARATED_MERGE == duplicate_header_handling_) { return utils::string::join(", ", matching_headers); } throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Duplicate Header Handling\" property not recognized."); } std::vector<std::string> ConsumeKafka::get_matching_headers(const rd_kafka_message_t& message, const std::string& header_name) const { // Headers fetched this way are freed when rd_kafka_message_destroy is called // Detaching them using rd_kafka_message_detach_headers does not seem to work rd_kafka_headers_t* headers_raw = nullptr; const rd_kafka_resp_err_t get_header_response = rd_kafka_message_headers(&message, &headers_raw); if (RD_KAFKA_RESP_ERR__NOENT == get_header_response) { return {}; } if (RD_KAFKA_RESP_ERR_NO_ERROR != get_header_response) { logger_->log_error("Failed to fetch message headers: {}: {}", magic_enum::enum_underlying(rd_kafka_last_error()), rd_kafka_err2str(rd_kafka_last_error())); } std::vector<std::string> matching_headers; for (std::size_t header_idx = 0;; ++header_idx) { const char* value = nullptr; // Not to be freed std::size_t size = 0; if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(), reinterpret_cast<const void**>(&value), &size)) { break; } if (size < 200) { logger_->log_debug("{:.{}}", value, size); } else { logger_->log_debug("{:.{}}...", value, 200); } matching_headers.emplace_back(value, size); } return matching_headers; } std::vector<std::pair<std::string, std::string>> ConsumeKafka::get_flowfile_attributes_from_message_header(const rd_kafka_message_t& message) const { std::vector<std::pair<std::string, std::string>> attributes_from_headers; for (const std::string& header_name: headers_to_add_as_attributes_) { const std::vector<std::string> matching_headers = get_matching_headers(message, header_name); if (!matching_headers.empty()) { attributes_from_headers.emplace_back(header_name, utils::get_encoded_string(resolve_duplicate_headers(matching_headers), message_header_encoding_attr_to_enum())); } } return attributes_from_headers; } void ConsumeKafka::add_kafka_attributes_to_flowfile(core::FlowFile& flow_file, const rd_kafka_message_t& message) const { // We do not currently support batching messages into a single flowfile flow_file.setAttribute(KAFKA_COUNT_ATTR, "1"); if (const auto message_key = utils::get_encoded_message_key(message, key_attr_encoding_attr_to_enum())) { flow_file.setAttribute(KAFKA_MESSAGE_KEY_ATTR, *message_key); } flow_file.setAttribute(KAFKA_OFFSET_ATTR, std::to_string(message.offset)); flow_file.setAttribute(KAFKA_PARTITION_ATTR, std::to_string(message.partition)); flow_file.setAttribute(KAFKA_TOPIC_ATTR, rd_kafka_topic_name(message.rkt)); } std::optional<std::vector<std::shared_ptr<core::FlowFile>>> ConsumeKafka::transform_pending_messages_into_flowfiles(core::ProcessSession& session) const { std::vector<std::shared_ptr<core::FlowFile>> flow_files_created; for (const auto& message: pending_messages_) { std::string message_content = extract_message(*message); std::vector<std::pair<std::string, std::string>> attributes_from_headers = get_flowfile_attributes_from_message_header(*message); std::vector<std::string> split_message{ !message_demarcator_.empty() ? utils::string::split(message_content, message_demarcator_) : std::vector<std::string>{message_content}}; for (auto& flowfile_content: split_message) { auto flow_file = session.create(); if (flow_file == nullptr) { logger_->log_error("Failed to create flowfile."); // Either transform all flowfiles or none return {}; } // flowfile content is consumed here session.writeBuffer(flow_file, flowfile_content); for (const auto& kv: attributes_from_headers) { flow_file->setAttribute(kv.first, kv.second); } add_kafka_attributes_to_flowfile(*flow_file, *message); flow_files_created.emplace_back(std::move(flow_file)); } } return {flow_files_created}; } void ConsumeKafka::process_pending_messages(core::ProcessSession& session) { std::optional<std::vector<std::shared_ptr<core::FlowFile>>> flow_files_created = transform_pending_messages_into_flowfiles(session); if (!flow_files_created) { return; } for (const auto& flow_file: flow_files_created.value()) { session.transfer(flow_file, Success); } session.commit(); // Commit the offset from the latest message only if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_commit_message(consumer_.get(), pending_messages_.back().get(), /* async = */ 0)) { logger_->log_error("Committing offset failed."); } pending_messages_.clear(); } void ConsumeKafka::onTrigger(core::ProcessContext&, core::ProcessSession& session) { std::unique_lock<std::mutex> lock(do_not_call_on_trigger_concurrently_); logger_->log_debug("ConsumeKafka onTrigger"); if (!pending_messages_.empty()) { process_pending_messages(session); return; } pending_messages_ = poll_kafka_messages(); if (pending_messages_.empty()) { return; } process_pending_messages(session); } REGISTER_RESOURCE(ConsumeKafka, Processor); } // namespace processors } // namespace org::apache::nifi::minifi