extensions/mqtt/processors/ConsumeMQTT.h (141 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <limits> #include <memory> #include <queue> #include <string> #include <unordered_map> #include <utility> #include "FlowFileRecord.h" #include "core/Core.h" #include "core/OutputAttributeDefinition.h" #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" #include "core/logging/LoggerFactory.h" #include "concurrentqueue.h" #include "AbstractMQTTProcessor.h" #include "utils/ArrayUtils.h" #include "utils/gsl.h" namespace org::apache::nifi::minifi::processors { class ConsumeMQTT : public processors::AbstractMQTTProcessor { public: explicit ConsumeMQTT(std::string_view name, const utils::Identifier& uuid = {}) : processors::AbstractMQTTProcessor(name, uuid) { logger_ = core::logging::LoggerFactory<ConsumeMQTT>::getLogger(uuid_); } EXTENSIONAPI static constexpr const char* Description = "This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. " "The the payload of the MQTT message becomes content of a FlowFile"; EXTENSIONAPI static constexpr auto Topic = core::PropertyDefinitionBuilder<>::createProperty("Topic") .withDescription("The topic to subscribe to.") .isRequired(true) .build(); EXTENSIONAPI static constexpr auto CleanSession = core::PropertyDefinitionBuilder<>::createProperty("Clean Session") .withDescription("Whether to start afresh rather than remembering previous subscriptions. If true, then make broker forget subscriptions after disconnected. MQTT 3.x only.") .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) .withDefaultValue("true") .build(); EXTENSIONAPI static constexpr auto CleanStart = core::PropertyDefinitionBuilder<>::createProperty("Clean Start") .withDescription("Whether to start afresh rather than remembering previous subscriptions. MQTT 5.x only.") .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) .withDefaultValue("true") .build(); EXTENSIONAPI static constexpr auto SessionExpiryInterval = core::PropertyDefinitionBuilder<>::createProperty("Session Expiry Interval") .withDescription("Time to delete session on broker after client is disconnected. MQTT 5.x only.") .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR) .withDefaultValue("0 s") .build(); EXTENSIONAPI static constexpr auto QueueBufferMaxMessage = core::PropertyDefinitionBuilder<>::createProperty("Queue Max Message") .withDescription("Maximum number of messages allowed on the received MQTT queue") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue("1000") .build(); EXTENSIONAPI static constexpr auto AttributeFromContentType = core::PropertyDefinitionBuilder<>::createProperty("Attribute From Content Type") .withDescription("Name of FlowFile attribute to be filled from content type of received message. MQTT 5.x only.") .build(); EXTENSIONAPI static constexpr auto TopicAliasMaximum = core::PropertyDefinitionBuilder<>::createProperty("Topic Alias Maximum") .withDescription("Maximum number of topic aliases to use. If set to 0, then topic aliases cannot be used. MQTT 5.x only.") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue("0") .build(); EXTENSIONAPI static constexpr auto ReceiveMaximum = core::PropertyDefinitionBuilder<>::createProperty("Receive Maximum") .withDescription("Maximum number of unacknowledged messages allowed. MQTT 5.x only.") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue(MQTT_MAX_RECEIVE_MAXIMUM_STR) .build(); EXTENSIONAPI static constexpr auto Properties = utils::array_cat(AbstractMQTTProcessor::BasicProperties, std::to_array<core::PropertyReference>({ Topic, CleanSession, CleanStart, SessionExpiryInterval, QueueBufferMaxMessage, AttributeFromContentType, TopicAliasMaximum, ReceiveMaximum }), AbstractMQTTProcessor::AdvancedProperties); EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"}; EXTENSIONAPI static constexpr auto Relationships = std::array{Success}; EXTENSIONAPI static constexpr auto BrokerOutputAttribute = core::OutputAttributeDefinition<0>{"mqtt.broker", {}, "URI of the sending broker"}; EXTENSIONAPI static constexpr auto TopicOutputAttribute = core::OutputAttributeDefinition<0>{"mqtt.topic", {}, "Topic of the message"}; EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 2>{BrokerOutputAttribute, TopicOutputAttribute}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN; EXTENSIONAPI static constexpr bool IsSingleThreaded = false; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS void readProperties(core::ProcessContext& context) override; void onTriggerImpl(core::ProcessContext& context, core::ProcessSession& session) override; void initialize() override; private: class WriteCallback { public: explicit WriteCallback(const SmartMessage& message, std::shared_ptr<core::logging::Logger> logger) : message_(message) , logger_(std::move(logger)) { } int64_t operator() (const std::shared_ptr<io::OutputStream>& stream); [[nodiscard]] bool getSuccessStatus() const { return success_status_; } private: const SmartMessage& message_; std::shared_ptr<core::logging::Logger> logger_; bool success_status_ = true; }; // MQTT static async callbacks, calling their non-static counterparts with context being pointer to "this" static void subscriptionSuccess(void* context, MQTTAsync_successData* response); static void subscriptionSuccess5(void* context, MQTTAsync_successData5* response); static void subscriptionFailure(void* context, MQTTAsync_failureData* response); static void subscriptionFailure5(void* context, MQTTAsync_failureData5* response); // MQTT non-static async callbacks void onSubscriptionSuccess(); void onSubscriptionFailure(MQTTAsync_failureData* response); void onSubscriptionFailure5(MQTTAsync_failureData5* response); void onMessageReceived(SmartMessage smart_message) override; /** * Enqueues received MQTT message into internal message queue. * Called as a callback on a separate thread than onTrigger, as a reaction to message incoming. * @param message message to put to queue */ void enqueueReceivedMQTTMsg(SmartMessage message); /** * Called in onTrigger to return the whole internal message queue * @return message queue of messages received since previous onTrigger */ std::queue<SmartMessage> getReceivedMqttMessages(); /** * Subscribes to topic */ void startupClient() override; void checkProperties() override; void checkBrokerLimitsImpl() override; /** * Resolve topic name if it was sent with an alias instead of a regular topic name * @param smart_message message to process */ void resolveTopicFromAlias(SmartMessage& smart_message); bool getCleanSession() const override { return clean_session_; } bool getCleanStart() const override { return clean_start_; } std::chrono::seconds getSessionExpiryInterval() const override { return session_expiry_interval_; } /** * Turn MQTT 5 User Properties to Flow File attributes */ void putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) const; /** * Fill a user-requested Flow File attribute from content type */ void fillAttributeFromContentType(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) const; void setProcessorSpecificMqtt5ConnectOptions(MQTTProperties& connect_props) const override; std::string topic_; bool clean_session_ = true; bool clean_start_ = true; std::chrono::seconds session_expiry_interval_{0}; uint64_t max_queue_size_ = 1000; std::string attribute_from_content_type_; uint16_t topic_alias_maximum_{0}; uint16_t receive_maximum_{MQTT_MAX_RECEIVE_MAXIMUM}; std::unordered_map<uint16_t, std::string> alias_to_topic_; moodycamel::ConcurrentQueue<SmartMessage> queue_; }; } // namespace org::apache::nifi::minifi::processors