extensions/mqtt/processors/PublishMQTT.h (113 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <limits>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "core/RelationshipDefinition.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/PropertyDefinitionBuilder.h"
#include "core/Core.h"
#include "core/Property.h"
#include "core/logging/LoggerFactory.h"
#include "AbstractMQTTProcessor.h"
#include "utils/ArrayUtils.h"
#include "utils/gsl.h"
#include "core/ProcessorMetrics.h"
namespace org::apache::nifi::minifi::processors {
class PublishMQTT : public processors::AbstractMQTTProcessor {
public:
explicit PublishMQTT(const std::string_view name, const utils::Identifier& uuid = {})
: processors::AbstractMQTTProcessor(name, uuid) {
metrics_ = gsl::make_not_null(std::make_shared<PublishMQTTMetrics>(*this, in_flight_message_counter_));
logger_ = core::logging::LoggerFactory<PublishMQTT>::getLogger(uuid_);
}
EXTENSIONAPI static constexpr const char* Description = "PublishMQTT serializes FlowFile content as an MQTT payload, sending the message to the configured topic and broker.";
EXTENSIONAPI static constexpr auto Topic = core::PropertyDefinitionBuilder<>::createProperty("Topic")
.withDescription("The topic to publish to.")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto Retain = core::PropertyDefinitionBuilder<>::createProperty("Retain")
.withDescription("Retain published message in broker")
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("false")
.build();
EXTENSIONAPI static constexpr auto MessageExpiryInterval = core::PropertyDefinitionBuilder<>::createProperty("Message Expiry Interval")
.withDescription("Time while message is valid and will be forwarded by broker. MQTT 5.x only.")
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.build();
EXTENSIONAPI static constexpr auto ContentType = core::PropertyDefinitionBuilder<>::createProperty("Content Type")
.withDescription("Content type of the message. MQTT 5.x only.")
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto Properties = utils::array_cat(AbstractMQTTProcessor::BasicProperties, std::to_array<core::PropertyReference>({
Topic,
Retain,
MessageExpiryInterval,
ContentType
}), AbstractMQTTProcessor::AdvancedProperties);
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"};
EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "FlowFiles that failed to be sent to the destination are transferred to this relationship"};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure};
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
void readProperties(core::ProcessContext& context) override;
void onTriggerImpl(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() override;
private:
/**
* Counts unacknowledged QoS 1 and QoS 2 messages to respect broker's Receive Maximum
*/
class InFlightMessageCounter {
public:
void setEnabled(bool status) { enabled_ = status; }
void setMax(uint16_t new_limit);
void increase();
void decrease();
uint16_t getCounter() const;
private:
bool enabled_ = false;
mutable std::mutex mutex_;
std::condition_variable cv_;
uint16_t counter_{0};
uint16_t limit_{MQTT_MAX_RECEIVE_MAXIMUM};
};
class PublishMQTTMetrics : public core::ProcessorMetricsImpl {
public:
PublishMQTTMetrics(const core::Processor& source_processor, const InFlightMessageCounter& in_flight_message_counter);
std::vector<state::response::SerializedResponseNode> serialize() override;
std::vector<state::PublishedMetric> calculateMetrics() override;
private:
gsl::not_null<const InFlightMessageCounter*> in_flight_message_counter_;
};
// MQTT static async callbacks, calling their notify with context being pointer to a packaged_task to notify()
static void sendSuccess(void* context, MQTTAsync_successData* response);
static void sendSuccess5(void* context, MQTTAsync_successData5* response);
static void sendFailure(void* context, MQTTAsync_failureData* response);
static void sendFailure5(void* context, MQTTAsync_failureData5* response);
/**
* Resolves topic from expression language
*/
std::string getTopic(core::ProcessContext& context, const core::FlowFile* const flow_file) const;
/**
* Resolves content type from expression language
*/
std::string getContentType(core::ProcessContext& context, const core::FlowFile* const flow_file) const;
/**
* Sends an MQTT message asynchronously
* @param buffer contents of the message
* @param topic topic of the message
* @param content_type Content Type for MQTT 5
* @param flow_file Flow File being processed
* @return success of message sending
*/
bool sendMessage(const std::vector<std::byte>& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file);
/**
* Callback for asynchronous message sending
* @param success if message sending was successful
* @param response_code response code for failure only
* @param reason_code MQTT 5 reason code
* @return if message sending was successful
*/
bool notify(bool success, std::optional<int> response_code, std::optional<MQTTReasonCodes> reason_code);
/**
* Set MQTT 5-exclusive properties
* @param message message object
* @param content_type content type
* @param flow_file Flow File being processed
*/
void setMqtt5Properties(MQTTAsync_message& message, const std::string& content_type, const std::shared_ptr<core::FlowFile>& flow_file) const;
/**
* Adds flow file attributes as user properties to an MQTT 5 message
* @param message message object
* @param flow_file Flow File being processed
*/
static void addAttributesAsUserProperties(MQTTAsync_message& message, const std::shared_ptr<core::FlowFile>& flow_file);
bool getCleanSession() const override {
return true;
}
bool getCleanStart() const override {
return true;
}
std::chrono::seconds getSessionExpiryInterval() const override {
// non-persistent session as we only publish
return std::chrono::seconds{0};
}
void startupClient() override {
// there is no need to do anything like subscribe in the beginning
}
void checkProperties() override;
void checkBrokerLimitsImpl() override;
bool retain_ = false;
std::optional<std::chrono::seconds> message_expiry_interval_;
InFlightMessageCounter in_flight_message_counter_;
};
} // namespace org::apache::nifi::minifi::processors