extensions/mqtt/processors/ConsumeMQTT.cpp (297 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "ConsumeMQTT.h" #include <memory> #include <string> #include <set> #include <cinttypes> #include <vector> #include "utils/StringUtils.h" #include "utils/ValueParser.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/Resource.h" namespace org::apache::nifi::minifi::processors { void ConsumeMQTT::initialize() { setSupportedProperties(Properties); setSupportedRelationships(Relationships); } void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) { if (queue_.size_approx() >= max_queue_size_) { logger_->log_error("MQTT queue full"); return; } logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen); queue_.enqueue(std::move(message)); } void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& context) { if (auto value = context->getProperty(Topic)) { topic_ = std::move(*value); } logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_); if (const auto value = context->getProperty(CleanSession) | utils::flatMap(&utils::StringUtils::toBool)) { clean_session_ = *value; } logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_); if (const auto value = context->getProperty(CleanStart) | utils::flatMap(&utils::StringUtils::toBool)) { clean_start_ = *value; } logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_); if (const auto session_expiry_interval = context->getProperty(SessionExpiryInterval) | utils::flatMap(&core::TimePeriodValue::fromString)) { session_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds()); } logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()}); if (const auto value = context->getProperty(QueueBufferMaxMessage) | utils::flatMap(&utils::toNumber<uint64_t>)) { max_queue_size_ = *value; } logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_); if (auto value = context->getProperty(AttributeFromContentType)) { attribute_from_content_type_ = std::move(*value); } logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_); if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum) | utils::flatMap(&utils::toNumber<uint32_t>)) { topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum); } logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_); if (const auto receive_maximum = context->getProperty(ReceiveMaximum) | utils::flatMap(&utils::toNumber<uint32_t>)) { receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum); } logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_); } void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) { std::queue<SmartMessage> msg_queue = getReceivedMqttMessages(); while (!msg_queue.empty()) { const auto& message = msg_queue.front(); std::shared_ptr<core::FlowFile> flow_file = session->create(); WriteCallback write_callback(message, logger_); try { session->write(flow_file, std::ref(write_callback)); } catch (const Exception& ex) { logger_->log_error("Error when processing message queue: %s", ex.what()); } if (!write_callback.getSuccessStatus()) { logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", flow_file->getUUIDStr()); session->remove(flow_file); } else { putUserPropertiesAsAttributes(message, flow_file, session); session->putAttribute(flow_file, BrokerOutputAttribute.name, uri_); session->putAttribute(flow_file, TopicOutputAttribute.name, message.topic); fillAttributeFromContentType(message, flow_file, session); logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", flow_file->getUUIDStr(), message.topic); session->transfer(flow_file, Success); } msg_queue.pop(); } } std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() { std::queue<SmartMessage> msg_queue; SmartMessage message; while (queue_.try_dequeue(message)) { msg_queue.push(std::move(message)); } return msg_queue; } int64_t ConsumeMQTT::WriteCallback::operator() (const std::shared_ptr<io::OutputStream>& stream) { if (message_.contents->payloadlen < 0) { success_status_ = false; logger_->log_error("Payload length of message is negative, value is [%d]", message_.contents->payloadlen); return -1; } const auto len = stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), gsl::narrow<size_t>(message_.contents->payloadlen)); if (io::isError(len)) { success_status_ = false; logger_->log_error("Stream writing error when processing message"); return -1; } return len; } void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const { if (mqtt_version_ != mqtt::MqttVersions::V_5_0) { return; } const auto property_count = MQTTProperties_propertyCount(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY); for (int i=0; i < property_count; ++i) { MQTTProperty* property = MQTTProperties_getPropertyAt(&message.contents->properties, MQTTPROPERTY_CODE_USER_PROPERTY, i); std::string key(property->value.data.data, property->value.data.len); std::string value(property->value.value.data, property->value.value.len); session->putAttribute(flow_file, key, value); } } void ConsumeMQTT::fillAttributeFromContentType(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const { if (mqtt_version_ != mqtt::MqttVersions::V_5_0 || attribute_from_content_type_.empty()) { return; } MQTTProperty* property = MQTTProperties_getProperty(&message.contents->properties, MQTTPROPERTY_CODE_CONTENT_TYPE); if (property == nullptr) { return; } std::string content_type(property->value.data.data, property->value.data.len); session->putAttribute(flow_file, attribute_from_content_type_, content_type); } void ConsumeMQTT::startupClient() { MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer; response_options.context = this; if (mqtt_version_ == mqtt::MqttVersions::V_5_0) { response_options.onSuccess5 = subscriptionSuccess5; response_options.onFailure5 = subscriptionFailure5; } else { response_options.onSuccess = subscriptionSuccess; response_options.onFailure = subscriptionFailure; } const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), gsl::narrow<int>(qos_), &response_options); if (ret != MQTTASYNC_SUCCESS) { logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret); return; } logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_); } void ConsumeMQTT::onMessageReceived(SmartMessage smart_message) { if (mqtt_version_ == mqtt::MqttVersions::V_5_0) { resolveTopicFromAlias(smart_message); } if (smart_message.topic.empty()) { logger_->log_error("Received message without topic"); return; } enqueueReceivedMQTTMsg(std::move(smart_message)); } void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& smart_message) { auto raw_alias = MQTTProperties_getNumericValue(&smart_message.contents->properties, MQTTPROPERTY_CODE_TOPIC_ALIAS); std::optional<uint16_t> alias; if (raw_alias != PAHO_MQTT_C_FAILURE_CODE) { alias = gsl::narrow<uint16_t>(raw_alias); } auto& topic = smart_message.topic; if (alias.has_value()) { if (*alias > topic_alias_maximum_) { logger_->log_error("Broker does not respect client's Topic Alias Maximum, sent a greater value: %" PRIu16 " > %" PRIu16, *alias, topic_alias_maximum_); return; } // if topic is empty, this is just a usage of a previously stored alias (look it up), otherwise a new one (store it) if (topic.empty()) { const auto iter = alias_to_topic_.find(*alias); if (iter == alias_to_topic_.end()) { logger_->log_error("Broker sent an alias that was not known to client before: %" PRIu16, *alias); } else { topic = iter->second; } } else { alias_to_topic_[*alias] = topic; } } else if (topic.empty()) { logger_->log_error("Received message without topic and alias"); } } void ConsumeMQTT::checkProperties() { if (mqtt_version_ == mqtt::MqttVersions::V_3_1_0 || mqtt_version_ == mqtt::MqttVersions::V_3_1_1 || mqtt_version_ == mqtt::MqttVersions::V_3X_AUTO) { if (isPropertyExplicitlySet(CleanStart)) { logger_->log_warn("MQTT 3.x specification does not support Clean Start. Property is not used."); } if (isPropertyExplicitlySet(SessionExpiryInterval)) { logger_->log_warn("MQTT 3.x specification does not support Session Expiry Intervals. Property is not used."); } if (isPropertyExplicitlySet(AttributeFromContentType)) { logger_->log_warn("MQTT 3.x specification does not support Content Types and thus attributes cannot be created from them. Property is not used."); } if (isPropertyExplicitlySet(TopicAliasMaximum)) { logger_->log_warn("MQTT 3.x specification does not support Topic Alias Maximum. Property is not used."); } if (isPropertyExplicitlySet(ReceiveMaximum)) { logger_->log_warn("MQTT 3.x specification does not support Receive Maximum. Property is not used."); } } if (mqtt_version_ == mqtt::MqttVersions::V_5_0 && isPropertyExplicitlySet(CleanSession)) { logger_->log_warn("MQTT 5.0 specification does not support Clean Session. Property is not used."); } if (clientID_.empty()) { if (mqtt_version_ == mqtt::MqttVersions::V_5_0) { if (session_expiry_interval_ > std::chrono::seconds(0)) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (Session Expiry Interval > 0) sessions"); } } else if (!clean_session_) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (non-clean) sessions"); } } if (qos_ == mqtt::MqttQoS::LEVEL_0) { if (mqtt_version_ == mqtt::MqttVersions::V_5_0) { if (session_expiry_interval_ > std::chrono::seconds(0)) { logger_->log_warn("Messages are not preserved during client disconnection " "by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved."); } } else if (!clean_session_) { logger_->log_warn("Messages are not preserved during client disconnection " "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved."); } } } void ConsumeMQTT::checkBrokerLimitsImpl() { auto hasWildcards = [] (std::string_view topic) { return std::any_of(topic.begin(), topic.end(), [] (const char ch) {return ch == '+' || ch == '#';}); }; if (wildcard_subscription_available_ == false && hasWildcards(topic_)) { std::ostringstream os; os << "Broker does not support wildcards but topic \"" << topic_ <<"\" has them"; throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str()); } if (maximum_session_expiry_interval_.has_value() && session_expiry_interval_ > maximum_session_expiry_interval_) { std::ostringstream os; os << "Set Session Expiry Interval (" << session_expiry_interval_.count() <<" s) is longer than the maximum supported by the broker (" << maximum_session_expiry_interval_->count() << " s)."; throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str()); } if (utils::StringUtils::startsWith(topic_, "$share/")) { if (mqtt_version_ == mqtt::MqttVersions::V_5_0) { // shared topic are supported on MQTT 5, unless explicitly denied by broker if (shared_subscription_available_ == false) { std::ostringstream os; os << "Shared topic feature with topic \"" << topic_ << "\" is not supported by broker"; throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str()); } } else { logger_->log_warn("Shared topic feature with topic \"%s\" might not be supported by broker on MQTT 3.x"); } } } void ConsumeMQTT::setProcessorSpecificMqtt5ConnectOptions(MQTTProperties& connect_props) const { if (topic_alias_maximum_ > 0) { MQTTProperty property; property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM; property.value.integer2 = topic_alias_maximum_; MQTTProperties_add(&connect_props, &property); } if (receive_maximum_ < MQTT_MAX_RECEIVE_MAXIMUM) { MQTTProperty property; property.identifier = MQTTPROPERTY_CODE_RECEIVE_MAXIMUM; property.value.integer2 = receive_maximum_; MQTTProperties_add(&connect_props, &property); } } void ConsumeMQTT::subscriptionSuccess(void* context, MQTTAsync_successData* /*response*/) { auto* processor = reinterpret_cast<ConsumeMQTT*>(context); processor->onSubscriptionSuccess(); } void ConsumeMQTT::subscriptionSuccess5(void* context, MQTTAsync_successData5* /*response*/) { auto* processor = reinterpret_cast<ConsumeMQTT*>(context); processor->onSubscriptionSuccess(); } void ConsumeMQTT::subscriptionFailure(void* context, MQTTAsync_failureData* response) { auto* processor = reinterpret_cast<ConsumeMQTT*>(context); processor->onSubscriptionFailure(response); } void ConsumeMQTT::subscriptionFailure5(void* context, MQTTAsync_failureData5* response) { auto* processor = reinterpret_cast<ConsumeMQTT*>(context); processor->onSubscriptionFailure5(response); } void ConsumeMQTT::onSubscriptionSuccess() { logger_->log_info("Successfully subscribed to MQTT topic %s on broker %s", topic_, uri_); } void ConsumeMQTT::onSubscriptionFailure(MQTTAsync_failureData* response) { logger_->log_error("Subscription failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code); if (response->message != nullptr) { logger_->log_error("Detailed reason for subscription failure: %s", response->message); } } void ConsumeMQTT::onSubscriptionFailure5(MQTTAsync_failureData5* response) { logger_->log_error("Subscription failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code); if (response->message != nullptr) { logger_->log_error("Detailed reason for subscription failure: %s", response->message); } logger_->log_error("Reason code for subscription failure: %d: %s", response->reasonCode, MQTTReasonCode_toString(response->reasonCode)); } REGISTER_RESOURCE(ConsumeMQTT, Processor); } // namespace org::apache::nifi::minifi::processors