extensions/mqtt/processors/PublishMQTT.cpp (242 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "PublishMQTT.h" #include <algorithm> #include <cinttypes> #include <memory> #include <optional> #include <vector> #include "utils/StringUtils.h" #include "utils/ProcessorConfigUtils.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/Resource.h" #include "core/state/Value.h" namespace { class RetriableError : public std::runtime_error { using std::runtime_error::runtime_error; }; } // namespace namespace org::apache::nifi::minifi::processors { using SendFinishedTask = std::packaged_task<bool(bool, std::optional<int>, std::optional<MQTTReasonCodes>)>; void PublishMQTT::initialize() { setSupportedProperties(Properties); setSupportedRelationships(Relationships); } void PublishMQTT::readProperties(core::ProcessContext& context) { if (!context.getProperty(Topic).has_value()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "PublishMQTT: Topic is required"); } retain_ = utils::parseBoolProperty(context, Retain); message_expiry_interval_ = utils::parseOptionalDurationProperty(context, MessageExpiryInterval) | utils::transform([](const auto&& ms) { return std::chrono::duration_cast<std::chrono::seconds>(ms); }); in_flight_message_counter_.setEnabled(mqtt_version_ == mqtt::MqttVersions::V_5_0 && qos_ != mqtt::MqttQoS::LEVEL_0); } void PublishMQTT::onTriggerImpl(core::ProcessContext& context, core::ProcessSession& session) { std::shared_ptr<core::FlowFile> flow_file = session.get(); if (!flow_file) { yield(); return; } // broker's Receive Maximum can change after reconnect in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM)); const auto topic = getTopic(context, flow_file.get()); try { const auto result = session.readBuffer(flow_file); if (result.status < 0 || !sendMessage(result.buffer, topic, getContentType(context, flow_file.get()), flow_file)) { logger_->log_error("Failed to send flow file [{}] to MQTT topic '{}' on broker {}", flow_file->getUUIDStr(), topic, uri_); session.transfer(flow_file, Failure); return; } logger_->log_debug("Sent flow file [{}] with length {} to MQTT topic '{}' on broker {}", flow_file->getUUIDStr(), result.status, topic, uri_); session.transfer(flow_file, Success); } catch (const Exception& ex) { logger_->log_error("Failed to send flow file [{}] to MQTT topic '{}' on broker {}, exception string: '{}'", flow_file->getUUIDStr(), topic, uri_, ex.what()); session.transfer(flow_file, Failure); } } bool PublishMQTT::sendMessage(const std::vector<std::byte>& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file) { static constexpr size_t max_packet_size = 256_MiB - 1; if (buffer.size() > max_packet_size) { logger_->log_error("Sending message failed because MQTT limit maximum packet size [{}] is exceeded by FlowFile of [{}]", std::to_string(max_packet_size), buffer.size()); return false; } if (maximum_packet_size_.has_value() && buffer.size() > *(maximum_packet_size_)) { logger_->log_error("Sending message failed because broker-requested maximum packet size [{}] is exceeded by FlowFile of [{}]", *maximum_packet_size_, buffer.size()); return false; } MQTTAsync_message message_to_publish = MQTTAsync_message_initializer; message_to_publish.payload = const_cast<std::byte*>(buffer.data()); message_to_publish.payloadlen = gsl::narrow<int>(buffer.size()); message_to_publish.qos = static_cast<int>(qos_); message_to_publish.retained = retain_; setMqtt5Properties(message_to_publish, content_type, flow_file); MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer; if (mqtt_version_ == mqtt::MqttVersions::V_5_0) { response_options.onSuccess5 = sendSuccess5; response_options.onFailure5 = sendFailure5; } else { response_options.onSuccess = sendSuccess; response_options.onFailure = sendFailure; } // save context for callback SendFinishedTask send_finished_task( [this] (const bool success, const std::optional<int> response_code, const std::optional<MQTTReasonCodes> reason_code) { return notify(success, response_code, reason_code); }); response_options.context = &send_finished_task; in_flight_message_counter_.increase(); const int error_code = MQTTAsync_sendMessage(client_, topic.c_str(), &message_to_publish, &response_options); if (error_code != MQTTASYNC_SUCCESS) { logger_->log_error("MQTTAsync_sendMessage failed on topic '{}', MQTT broker {} with error code [{}]", topic, uri_, error_code); // early fail, sending attempt did not succeed, no need to wait for callback in_flight_message_counter_.decrease(); return false; } return send_finished_task.get_future().get(); } void PublishMQTT::checkProperties() { auto is_property_explicitly_set = [this](const std::string_view property_name) -> bool { const auto property_values = getAllPropertyValues(property_name) | utils::orThrow("It should only be called on valid property"); return !property_values.empty(); }; if ((mqtt_version_ == mqtt::MqttVersions::V_3_1_0 || mqtt_version_ == mqtt::MqttVersions::V_3_1_1 || mqtt_version_ == mqtt::MqttVersions::V_3X_AUTO)) { if (is_property_explicitly_set(MessageExpiryInterval.name)) { logger_->log_warn("MQTT 3.x specification does not support Message Expiry Intervals. Property is not used."); } if (is_property_explicitly_set(ContentType.name)) { logger_->log_warn("MQTT 3.x specification does not support Content Types. Property is not used."); } } } void PublishMQTT::checkBrokerLimitsImpl() { if (retain_available_ == false && retain_) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Retain was set but broker does not support it"); } } void PublishMQTT::sendSuccess(void* context, MQTTAsync_successData* /*response*/) { auto send_finished_task = reinterpret_cast<SendFinishedTask*>(context); (*send_finished_task)(true, std::nullopt, std::nullopt); } void PublishMQTT::sendSuccess5(void* context, MQTTAsync_successData5* response) { auto send_finished_task = reinterpret_cast<SendFinishedTask*>(context); (*send_finished_task)(true, std::nullopt, response->reasonCode); } void PublishMQTT::sendFailure(void* context, MQTTAsync_failureData* response) { auto send_finished_task = reinterpret_cast<SendFinishedTask*>(context); (*send_finished_task)(false, response->code, std::nullopt); } void PublishMQTT::sendFailure5(void* context, MQTTAsync_failureData5* response) { auto send_finished_task = reinterpret_cast<SendFinishedTask*>(context); (*send_finished_task)(false, response->code, response->reasonCode); } bool PublishMQTT::notify(const bool success, const std::optional<int> response_code, const std::optional<MQTTReasonCodes> reason_code) { in_flight_message_counter_.decrease(); if (success) { logger_->log_debug("Successfully sent message to MQTT broker {}", uri_); if (reason_code.has_value()) { logger_->log_error("Additional reason code for sending success: {}: {}", magic_enum::enum_underlying(*reason_code), MQTTReasonCode_toString(*reason_code)); } } else { logger_->log_error("Sending message failed to MQTT broker {} with response code {}", uri_, *response_code); if (reason_code.has_value()) { logger_->log_error("Reason code for sending failure: {}: {}", magic_enum::enum_underlying(*reason_code), MQTTReasonCode_toString(*reason_code)); } } return success; } std::string PublishMQTT::getTopic(core::ProcessContext& context, const core::FlowFile* const flow_file) const { if (auto value = context.getProperty(Topic, flow_file)) { logger_->log_debug("PublishMQTT: Topic resolved as \"{}\"", *value); return *value; } throw minifi::Exception(ExceptionType::GENERAL_EXCEPTION, "Could not resolve required property Topic"); } std::string PublishMQTT::getContentType(core::ProcessContext& context, const core::FlowFile* const flow_file) const { if (auto value = context.getProperty(ContentType, flow_file)) { logger_->log_debug("PublishMQTT: Content Type resolved as \"{}\"", *value); return *value; } return ""; } void PublishMQTT::setMqtt5Properties(MQTTAsync_message& message, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file) const { if (mqtt_version_ != mqtt::MqttVersions::V_5_0) { return; } if (message_expiry_interval_.has_value()) { MQTTProperty property; property.identifier = MQTTPROPERTY_CODE_MESSAGE_EXPIRY_INTERVAL; property.value.integer4 = gsl::narrow<int>(message_expiry_interval_->count()); // NOLINT(cppcoreguidelines-pro-type-union-access) MQTTProperties_add(&message.properties, &property); } if (!content_type.empty()) { MQTTProperty property; property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE; property.value.data.len = gsl::narrow<int>(content_type.length()); // NOLINT(cppcoreguidelines-pro-type-union-access) property.value.data.data = const_cast<char*>(content_type.data()); // NOLINT(cppcoreguidelines-pro-type-union-access) MQTTProperties_add(&message.properties, &property); } addAttributesAsUserProperties(message, flow_file); } void PublishMQTT::addAttributesAsUserProperties(MQTTAsync_message& message, const std::shared_ptr<core::FlowFile>& flow_file) { for (const auto& [key, value] : *flow_file->getAttributesPtr()) { MQTTProperty property; property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; // key property.value.data.len = gsl::narrow<int>(key.length()); // NOLINT(cppcoreguidelines-pro-type-union-access) property.value.data.data = const_cast<char*>(key.data()); // NOLINT(cppcoreguidelines-pro-type-union-access) // value property.value.value.len = gsl::narrow<int>(value.length()); // NOLINT(cppcoreguidelines-pro-type-union-access) property.value.value.data = const_cast<char*>(value.data()); // NOLINT(cppcoreguidelines-pro-type-union-access) MQTTProperties_add(&message.properties, &property); } } void PublishMQTT::InFlightMessageCounter::setMax(const uint16_t new_limit) { if (!enabled_) { return; } { std::lock_guard lock{mutex_}; limit_ = new_limit; } cv_.notify_one(); } // increase on sending, wait if limit is reached void PublishMQTT::InFlightMessageCounter::increase() { using namespace std::literals::chrono_literals; if (!enabled_) { return; } std::unique_lock lock{mutex_}; const bool success = cv_.wait_for(lock, 5s, [this] { return counter_ < limit_; }); if (!success) { throw RetriableError{"Timed out while waiting for a free upload slot on the MQTT server"}; } ++counter_; } // decrease on success or failure, notify void PublishMQTT::InFlightMessageCounter::decrease() { if (!enabled_) { return; } { std::lock_guard lock{mutex_}; --counter_; } cv_.notify_one(); } uint16_t PublishMQTT::InFlightMessageCounter::getCounter() const { std::lock_guard lock{mutex_}; return counter_; } PublishMQTT::PublishMQTTMetrics::PublishMQTTMetrics(const core::Processor& source_processor, const InFlightMessageCounter& in_flight_message_counter) : core::ProcessorMetricsImpl(source_processor), in_flight_message_counter_(&in_flight_message_counter) { } std::vector<state::response::SerializedResponseNode> PublishMQTT::PublishMQTTMetrics::serialize() { auto metrics_vector = core::ProcessorMetricsImpl::serialize(); gsl_Expects(!metrics_vector.empty()); auto& metrics = metrics_vector[0]; state::response::SerializedResponseNode in_flight_message_count_node{"InFlightMessageCount", static_cast<uint32_t>(in_flight_message_counter_->getCounter())}; metrics.children.push_back(in_flight_message_count_node); return metrics_vector; } std::vector<state::PublishedMetric> PublishMQTT::PublishMQTTMetrics::calculateMetrics() { auto metrics = core::ProcessorMetricsImpl::calculateMetrics(); metrics.push_back({"in_flight_message_count", static_cast<double>(in_flight_message_counter_->getCounter()), getCommonLabels()}); return metrics; } REGISTER_RESOURCE(PublishMQTT, Processor); } // namespace org::apache::nifi::minifi::processors