extensions/mqtt/processors/AbstractMQTTProcessor.h (245 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <limits> #include <memory> #include <string> #include <utility> #include <vector> #include <shared_mutex> #include "core/PropertyDefinition.h" #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/PropertyDefinitionBuilder.h" #include "core/Core.h" #include "core/logging/LoggerFactory.h" #include "utils/Enum.h" #include "MQTTAsync.h" namespace org::apache::nifi::minifi::processors::mqtt { enum class MqttVersions { V_3X_AUTO, V_3_1_0, V_3_1_1, V_5_0 }; enum class MqttQoS : uint8_t { LEVEL_0, LEVEL_1, LEVEL_2 }; } // namespace org::apache::nifi::minifi::processors::mqtt namespace magic_enum::customize { using MqttVersions = org::apache::nifi::minifi::processors::mqtt::MqttVersions; using MqttQoS = org::apache::nifi::minifi::processors::mqtt::MqttQoS; template <> constexpr customize_t enum_name<MqttVersions>(MqttVersions value) noexcept { switch (value) { case MqttVersions::V_3X_AUTO: return "3.x AUTO"; case MqttVersions::V_3_1_0: return "3.1.0"; case MqttVersions::V_3_1_1: return "3.1.1"; case MqttVersions::V_5_0: return "5.0"; } return invalid_tag; } template <> constexpr customize_t enum_name<MqttQoS>(MqttQoS value) noexcept { switch (value) { case MqttQoS::LEVEL_0: return "0"; case MqttQoS::LEVEL_1: return "1"; case MqttQoS::LEVEL_2: return "2"; } return invalid_tag; } } // namespace magic_enum::customize namespace org::apache::nifi::minifi::processors { static constexpr const char* const MQTT_SECURITY_PROTOCOL_SSL = "ssl"; class AbstractMQTTProcessor : public core::ProcessorImpl { public: explicit AbstractMQTTProcessor(std::string_view name, const utils::Identifier& uuid = {}, std::shared_ptr<core::ProcessorMetrics> metrics = {}) : core::ProcessorImpl(name, uuid, std::move(metrics)) { } ~AbstractMQTTProcessor() override { freeResources(); } EXTENSIONAPI static constexpr auto BrokerURI = core::PropertyDefinitionBuilder<>::createProperty("Broker URI") .withDescription("The URI to use to connect to the MQTT broker") .isRequired(true) .build(); EXTENSIONAPI static constexpr auto ClientID = core::PropertyDefinitionBuilder<>::createProperty("Client ID") .withDescription("MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!") .build(); EXTENSIONAPI static constexpr auto QoS = core::PropertyDefinitionBuilder<magic_enum::enum_count<mqtt::MqttQoS>()>::createProperty("Quality of Service") .withDescription("The Quality of Service (QoS) of messages.") .withDefaultValue(magic_enum::enum_name(mqtt::MqttQoS::LEVEL_0)) .withAllowedValues(magic_enum::enum_names<mqtt::MqttQoS>()) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto MqttVersion = core::PropertyDefinitionBuilder<magic_enum::enum_count<mqtt::MqttVersions>()>::createProperty("MQTT Version") .withDescription("The MQTT specification version when connecting to the broker.") .withDefaultValue(magic_enum::enum_name(mqtt::MqttVersions::V_3X_AUTO)) .withAllowedValues(magic_enum::enum_names<mqtt::MqttVersions>()) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto ConnectionTimeout = core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout") .withDescription("Maximum time interval the client will wait for the network connection to the MQTT broker") .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR) .withDefaultValue("10 sec") .build(); EXTENSIONAPI static constexpr auto KeepAliveInterval = core::PropertyDefinitionBuilder<>::createProperty("Keep Alive Interval") .withDescription("Defines the maximum time interval between messages sent or received") .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR) .withDefaultValue("60 sec") .build(); EXTENSIONAPI static constexpr auto LastWillTopic = core::PropertyDefinitionBuilder<>::createProperty("Last Will Topic") .withDescription("The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent") .build(); EXTENSIONAPI static constexpr auto LastWillMessage = core::PropertyDefinitionBuilder<>::createProperty("Last Will Message") .withDescription("The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker") .build(); EXTENSIONAPI static constexpr auto LastWillQoS = core::PropertyDefinitionBuilder<magic_enum::enum_count<mqtt::MqttQoS>()>::createProperty("Last Will QoS") .withDescription("The Quality of Service (QoS) to send the last will with.") .withDefaultValue(magic_enum::enum_name(mqtt::MqttQoS::LEVEL_0)) .withAllowedValues(magic_enum::enum_names<mqtt::MqttQoS>()) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto LastWillRetain = core::PropertyDefinitionBuilder<>::createProperty("Last Will Retain") .withDescription("Whether to retain the client's Last Will") .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) .withDefaultValue("false") .build(); EXTENSIONAPI static constexpr auto LastWillContentType = core::PropertyDefinitionBuilder<>::createProperty("Last Will Content Type") .withDescription("Content type of the client's Last Will. MQTT 5.x only.") .build(); EXTENSIONAPI static constexpr auto Username = core::PropertyDefinitionBuilder<>::createProperty("Username") .withDescription("Username to use when connecting to the broker") .build(); EXTENSIONAPI static constexpr auto Password = core::PropertyDefinitionBuilder<>::createProperty("Password") .withDescription("Password to use when connecting to the broker") .isSensitive(true) .build(); EXTENSIONAPI static constexpr auto SecurityProtocol = core::PropertyDefinitionBuilder<>::createProperty("Security Protocol") .withDescription("Protocol used to communicate with brokers") .build(); EXTENSIONAPI static constexpr auto SecurityCA = core::PropertyDefinitionBuilder<>::createProperty("Security CA") .withDescription("File or directory path to CA certificate(s) for verifying the broker's key") .build(); EXTENSIONAPI static constexpr auto SecurityCert = core::PropertyDefinitionBuilder<>::createProperty("Security Cert") .withDescription("Path to client's public key (PEM) used for authentication") .build(); EXTENSIONAPI static constexpr auto SecurityPrivateKey = core::PropertyDefinitionBuilder<>::createProperty("Security Private Key") .withDescription("Path to client's private key (PEM) used for authentication") .build(); EXTENSIONAPI static constexpr auto SecurityPrivateKeyPassword = core::PropertyDefinitionBuilder<>::createProperty("Security Pass Phrase") .withDescription("Private key passphrase") .isSensitive(true) .build(); EXTENSIONAPI static constexpr auto BasicProperties = std::to_array<core::PropertyReference>({ BrokerURI, ClientID, MqttVersion }); EXTENSIONAPI static constexpr auto AdvancedProperties = std::to_array<core::PropertyReference>({ QoS, ConnectionTimeout, KeepAliveInterval, LastWillTopic, LastWillMessage, LastWillQoS, LastWillRetain, LastWillContentType, Username, Password, SecurityProtocol, SecurityCA, SecurityCert, SecurityPrivateKey, SecurityPrivateKeyPassword }); void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void notifyStop() override { freeResources(); } protected: struct MQTTMessageDeleter { void operator()(MQTTAsync_message* message) { MQTTAsync_freeMessage(&message); } }; struct SmartMessage { std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> contents; std::string topic; }; // defined by Paho MQTT C library static constexpr int PAHO_MQTT_C_FAILURE_CODE = -9999999; static constexpr int MQTT_MAX_RECEIVE_MAXIMUM = 65535; static constexpr std::string_view MQTT_MAX_RECEIVE_MAXIMUM_STR = "65535"; /** * Connect to MQTT broker. Synchronously waits until connection succeeds or fails. */ void reconnect(); /** * Checks property consistency before connecting to broker */ virtual void checkProperties() { } /** * Checks broker limits and supported features vs our desired features after connecting to broker */ void checkBrokerLimits(); virtual void checkBrokerLimitsImpl() = 0; // variables being used for a synchronous connection and disconnection std::shared_mutex client_mutex_; MQTTAsync client_ = nullptr; std::string uri_; std::chrono::seconds keep_alive_interval_{60}; std::chrono::seconds connection_timeout_{10}; mqtt::MqttQoS qos_{mqtt::MqttQoS::LEVEL_0}; std::string clientID_; std::string username_; std::string password_; mqtt::MqttVersions mqtt_version_{mqtt::MqttVersions::V_3X_AUTO}; // Supported operations std::optional<bool> retain_available_; std::optional<bool> wildcard_subscription_available_; std::optional<bool> shared_subscription_available_; std::optional<uint16_t> broker_topic_alias_maximum_; std::optional<uint16_t> broker_receive_maximum_; std::optional<uint8_t> maximum_qos_; std::optional<uint32_t> maximum_packet_size_; std::optional<std::chrono::seconds> maximum_session_expiry_interval_; std::optional<std::chrono::seconds> server_keep_alive_; private: using ConnectFinishedTask = std::packaged_task<void(MQTTAsync_successData*, MQTTAsync_successData5*, MQTTAsync_failureData*, MQTTAsync_failureData5*)>; /** * Initializes local MQTT client and connects to broker. */ void initializeClient(); /** * Calls disconnect() and releases local MQTT client */ void freeResources(); /** * Disconnect from MQTT broker. Synchronously waits until disconnection succeeds or fails. */ void disconnect(); virtual void readProperties(core::ProcessContext& context) = 0; virtual void onTriggerImpl(core::ProcessContext& context, core::ProcessSession& session) = 0; virtual void startupClient() = 0; void setBrokerLimits(MQTTAsync_successData5* response); // MQTT static async callbacks, calling their non-static counterparts with context being pointer to "this" static void connectionLost(void *context, char* cause); static void connectionSuccess(void* context, MQTTAsync_successData* response); static void connectionSuccess5(void* context, MQTTAsync_successData5* response); static void connectionFailure(void* context, MQTTAsync_failureData* response); static void connectionFailure5(void* context, MQTTAsync_failureData5* response); static int msgReceived(void *context, char* topic_name, int topic_len, MQTTAsync_message* message); // MQTT async callback methods void onConnectionLost(char* cause); void onConnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5); void onDisconnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5); /** * Called if message is received. This is default implementation, to be overridden if subclass wants to use the message. * @param topic topic of message * @param message MQTT message */ virtual void onMessageReceived(SmartMessage /*smartmessage*/) { } virtual bool getCleanSession() const = 0; virtual bool getCleanStart() const = 0; virtual std::chrono::seconds getSessionExpiryInterval() const = 0; MQTTAsync_connectOptions createConnectOptions(MQTTProperties& connect_properties, MQTTProperties& will_properties, ConnectFinishedTask& connect_finished_task); MQTTAsync_connectOptions createMqtt3ConnectOptions() const; MQTTAsync_connectOptions createMqtt5ConnectOptions(MQTTProperties& connect_properties, MQTTProperties& will_properties) const; virtual void setProcessorSpecificMqtt5ConnectOptions(MQTTProperties& /*connect_props*/) const { } // SSL std::optional<MQTTAsync_SSLOptions> sslOpts_; std::string securityCA_; std::string securityCert_; std::string securityPrivateKey_; std::string securityPrivateKeyPassword_; // Last Will std::optional<MQTTAsync_willOptions> last_will_; std::string last_will_topic_; std::string last_will_message_; mqtt::MqttQoS last_will_qos_{mqtt::MqttQoS::LEVEL_0}; bool last_will_retain_ = false; std::string last_will_content_type_; }; } // namespace org::apache::nifi::minifi::processors