extensions/mqtt/processors/AbstractMQTTProcessor.cpp (361 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AbstractMQTTProcessor.h"
#include <memory>
#include <string>
#include <utility>
#include "utils/StringUtils.h"
#include "utils/ProcessorConfigUtils.h"
#include "core/ProcessContext.h"
namespace org::apache::nifi::minifi::processors {
void AbstractMQTTProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
uri_ = utils::parseProperty(context, BrokerURI);
mqtt_version_ = utils::parseEnumProperty<mqtt::MqttVersions>(context, MqttVersion);
if (auto value = context.getProperty(ClientID)) {
clientID_ = std::move(*value);
} else if (mqtt_version_ == mqtt::MqttVersions::V_3_1_0) {
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 3.1.0 specification does not support empty client IDs");
}
if (auto value = context.getProperty(Username)) {
username_ = std::move(*value);
}
logger_->log_debug("AbstractMQTTProcessor: Username [{}]", username_);
if (auto value = context.getProperty(Password)) {
password_ = std::move(*value);
}
logger_->log_debug("AbstractMQTTProcessor: Password [{}]", password_);
keep_alive_interval_ = std::chrono::duration_cast<std::chrono::seconds>(utils::parseDurationProperty(context, KeepAliveInterval));
logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [{}] s", int64_t{keep_alive_interval_.count()});
connection_timeout_ = std::chrono::duration_cast<std::chrono::seconds>(utils::parseDurationProperty(context, ConnectionTimeout));
logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [{}] s", int64_t{connection_timeout_.count()});
qos_ = utils::parseEnumProperty<mqtt::MqttQoS>(context, QoS);
logger_->log_debug("AbstractMQTTProcessor: QoS [{}]", magic_enum::enum_name(qos_));
if (const auto security_protocol = context.getProperty(SecurityProtocol)) {
if (*security_protocol == MQTT_SECURITY_PROTOCOL_SSL) {
sslOpts_ = MQTTAsync_SSLOptions_initializer;
if (auto value = context.getProperty(SecurityCA)) {
logger_->log_debug("AbstractMQTTProcessor: trustStore [{}]", *value);
securityCA_ = std::move(*value);
sslOpts_->trustStore = securityCA_.c_str();
}
if (auto value = context.getProperty(SecurityCert)) {
logger_->log_debug("AbstractMQTTProcessor: keyStore [{}]", *value);
securityCert_ = std::move(*value);
sslOpts_->keyStore = securityCert_.c_str();
}
if (auto value = context.getProperty(SecurityPrivateKey)) {
logger_->log_debug("AbstractMQTTProcessor: privateKey [{}]", *value);
securityPrivateKey_ = std::move(*value);
sslOpts_->privateKey = securityPrivateKey_.c_str();
}
if (auto value = context.getProperty(SecurityPrivateKeyPassword)) {
logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [{}]", *value);
securityPrivateKeyPassword_ = std::move(*value);
sslOpts_->privateKeyPassword = securityPrivateKeyPassword_.c_str();
}
}
}
if (auto last_will_topic = context.getProperty(LastWillTopic); last_will_topic.has_value() && !last_will_topic->empty()) {
last_will_ = MQTTAsync_willOptions_initializer;
logger_->log_debug("AbstractMQTTProcessor: Last Will Topic [{}]", *last_will_topic);
last_will_topic_ = std::move(*last_will_topic);
last_will_->topicName = last_will_topic_.c_str();
if (auto value = context.getProperty(LastWillMessage)) {
logger_->log_debug("AbstractMQTTProcessor: Last Will Message [{}]", *value);
last_will_message_ = std::move(*value);
last_will_->message = last_will_message_.c_str();
}
last_will_qos_ = utils::parseEnumProperty<mqtt::MqttQoS>(context, LastWillQoS);
logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [{}]", magic_enum::enum_name(last_will_qos_));
last_will_->qos = static_cast<int>(last_will_qos_);
last_will_retain_ = utils::parseBoolProperty(context, LastWillRetain);
last_will_->retained = last_will_retain_;
if (auto value = context.getProperty(LastWillContentType)) {
logger_->log_debug("AbstractMQTTProcessor: Last Will Content Type [{}]", *value);
last_will_content_type_ = std::move(*value);
}
}
readProperties(context);
checkProperties();
initializeClient();
}
void AbstractMQTTProcessor::initializeClient() {
// write lock
std::lock_guard client_lock{client_mutex_};
if (!client_) {
MQTTAsync_createOptions options = MQTTAsync_createOptions_initializer;
if (mqtt_version_ == mqtt::MqttVersions::V_5_0) {
options.MQTTVersion = MQTTVERSION_5;
}
if (MQTTAsync_createWithOptions(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr, &options) != MQTTASYNC_SUCCESS) {
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Creating MQTT client failed");
}
}
if (client_) {
if (MQTTAsync_setCallbacks(client_, this, connectionLost, msgReceived, nullptr) == MQTTASYNC_FAILURE) {
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Setting MQTT client callbacks failed");
}
// call reconnect to bootstrap
reconnect();
}
}
void AbstractMQTTProcessor::reconnect() {
if (!client_) {
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect");
}
if (MQTTAsync_isConnected(client_)) {
logger_->log_debug("Already connected to {}, no need to reconnect", uri_);
return;
}
MQTTProperties connect_properties = MQTTProperties_initializer;
MQTTProperties will_properties = MQTTProperties_initializer;
ConnectFinishedTask connect_finished_task(
[this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) {
onConnectFinished(success_data, success_data_5, failure_data, failure_data_5);
});
const MQTTAsync_connectOptions connect_options = createConnectOptions(connect_properties, will_properties, connect_finished_task);
logger_->log_info("Reconnecting to {}", uri_);
if (MQTTAsync_isConnected(client_)) {
logger_->log_debug("Already connected to {}, no need to reconnect", uri_);
return;
}
const int ret = MQTTAsync_connect(client_, &connect_options);
MQTTProperties_free(&connect_properties);
if (ret != MQTTASYNC_SUCCESS) {
logger_->log_error("MQTTAsync_connect failed to MQTT broker {} with error code [{}]", uri_, ret);
return;
}
// wait until connection succeeds or fails
connect_finished_task.get_future().get();
}
MQTTAsync_connectOptions AbstractMQTTProcessor::createConnectOptions(MQTTProperties& connect_properties, MQTTProperties& will_properties, ConnectFinishedTask& connect_finished_task) {
MQTTAsync_connectOptions connect_options = [this, &connect_properties, &will_properties] {
if (mqtt_version_ == mqtt::MqttVersions::V_5_0) {
return createMqtt5ConnectOptions(connect_properties, will_properties);
} else {
return createMqtt3ConnectOptions();
}
}();
connect_options.context = &connect_finished_task;
connect_options.connectTimeout = gsl::narrow<int>(connection_timeout_.count());
connect_options.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count());
if (!username_.empty()) {
connect_options.username = username_.c_str();
connect_options.password = password_.c_str();
}
if (sslOpts_) {
connect_options.ssl = &*sslOpts_;
}
if (last_will_) {
connect_options.will = &*last_will_;
}
return connect_options;
}
MQTTAsync_connectOptions AbstractMQTTProcessor::createMqtt3ConnectOptions() const {
MQTTAsync_connectOptions connect_options = MQTTAsync_connectOptions_initializer;
connect_options.onSuccess = connectionSuccess;
connect_options.onFailure = connectionFailure;
connect_options.cleansession = getCleanSession();
if (mqtt_version_ == mqtt::MqttVersions::V_3_1_0) {
connect_options.MQTTVersion = MQTTVERSION_3_1;
} else if (mqtt_version_ == mqtt::MqttVersions::V_3_1_1) {
connect_options.MQTTVersion = MQTTVERSION_3_1_1;
}
return connect_options;
}
MQTTAsync_connectOptions AbstractMQTTProcessor::createMqtt5ConnectOptions(MQTTProperties& connect_properties, MQTTProperties& will_properties) const {
MQTTAsync_connectOptions connect_options = MQTTAsync_connectOptions_initializer5;
connect_options.onSuccess5 = connectionSuccess5;
connect_options.onFailure5 = connectionFailure5;
connect_options.connectProperties = &connect_properties;
connect_options.cleanstart = getCleanStart();
{
MQTTProperty property;
property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
property.value.integer4 = gsl::narrow<unsigned int>(getSessionExpiryInterval().count()); // NOLINT(cppcoreguidelines-pro-type-union-access)
MQTTProperties_add(&connect_properties, &property);
}
if (!last_will_content_type_.empty()) {
MQTTProperty property;
property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
property.value.data.len = gsl::narrow<int>(last_will_content_type_.length()); // NOLINT(cppcoreguidelines-pro-type-union-access)
property.value.data.data = const_cast<char*>(last_will_content_type_.data()); // NOLINT(cppcoreguidelines-pro-type-union-access)
MQTTProperties_add(&will_properties, &property);
}
connect_options.willProperties = &will_properties;
setProcessorSpecificMqtt5ConnectOptions(connect_properties);
return connect_options;
}
void AbstractMQTTProcessor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
std::shared_lock client_lock{client_mutex_};
if (client_ == nullptr) {
logger_->log_debug("Null-op in onTrigger, processor is shutting down.");
return;
}
reconnect();
if (!MQTTAsync_isConnected(client_)) {
logger_->log_error("Could not work with MQTT broker because disconnected to {}", uri_);
yield();
return;
}
onTriggerImpl(context, session);
}
void AbstractMQTTProcessor::freeResources() {
// write lock
std::lock_guard client_lock{client_mutex_};
if (!client_) {
return;
}
disconnect();
MQTTAsync_destroy(&client_);
}
void AbstractMQTTProcessor::disconnect() {
if (!MQTTAsync_isConnected(client_)) {
return;
}
MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer;
ConnectFinishedTask disconnect_finished_task(
[this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) {
onDisconnectFinished(success_data, success_data_5, failure_data, failure_data_5);
});
disconnect_options.context = &disconnect_finished_task;
if (mqtt_version_ == mqtt::MqttVersions::V_5_0) {
disconnect_options.onSuccess5 = connectionSuccess5;
disconnect_options.onFailure5 = connectionFailure5;
} else {
disconnect_options.onSuccess = connectionSuccess;
disconnect_options.onFailure = connectionFailure;
}
disconnect_options.timeout = gsl::narrow<int>(std::chrono::milliseconds{connection_timeout_}.count());
const int ret = MQTTAsync_disconnect(client_, &disconnect_options);
if (ret != MQTTASYNC_SUCCESS) {
logger_->log_error("MQTTAsync_disconnect failed to MQTT broker {} with error code [{}]", uri_, ret);
return;
}
// wait until connection succeeds or fails
disconnect_finished_task.get_future().get();
}
void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) {
auto readProperty = [response] (MQTTPropertyCodes property_code, auto& out_var) {
const auto value = MQTTProperties_getNumericValue(&response->properties, property_code);
if (value != PAHO_MQTT_C_FAILURE_CODE) {
if constexpr (std::is_same_v<decltype(out_var), std::optional<std::chrono::seconds>&>) {
out_var = std::chrono::seconds(value);
} else {
out_var = gsl::narrow<typename std::remove_reference_t<decltype(out_var)>::value_type>(value);
}
} else {
out_var.reset();
}
};
readProperty(MQTTPROPERTY_CODE_RETAIN_AVAILABLE, retain_available_);
readProperty(MQTTPROPERTY_CODE_WILDCARD_SUBSCRIPTION_AVAILABLE, wildcard_subscription_available_);
readProperty(MQTTPROPERTY_CODE_SHARED_SUBSCRIPTION_AVAILABLE, shared_subscription_available_);
readProperty(MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM, broker_topic_alias_maximum_);
readProperty(MQTTPROPERTY_CODE_RECEIVE_MAXIMUM, broker_receive_maximum_);
readProperty(MQTTPROPERTY_CODE_MAXIMUM_QOS, maximum_qos_);
readProperty(MQTTPROPERTY_CODE_MAXIMUM_PACKET_SIZE, maximum_packet_size_);
readProperty(MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL, maximum_session_expiry_interval_);
readProperty(MQTTPROPERTY_CODE_SERVER_KEEP_ALIVE, server_keep_alive_);
}
void AbstractMQTTProcessor::checkBrokerLimits() {
try {
if (server_keep_alive_.has_value() && server_keep_alive_ < keep_alive_interval_) {
std::ostringstream os;
os << "Set Keep Alive Interval (" << keep_alive_interval_.count() << " s) is longer than the maximum supported by the broker (" << server_keep_alive_->count() << " s)";
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, os.str());
}
if (maximum_qos_.has_value() && static_cast<uint8_t>(qos_) > maximum_qos_) {
std::ostringstream os;
os << "Set QoS (" << static_cast<uint8_t>(qos_) << ") is higher than the maximum supported by the broker (" << *maximum_qos_ << ")";
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, os.str());
}
checkBrokerLimitsImpl();
}
catch (...) {
disconnect();
throw;
}
}
void AbstractMQTTProcessor::connectionLost(void *context, char* cause) {
auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
processor->onConnectionLost(cause);
}
void AbstractMQTTProcessor::connectionSuccess(void* context, MQTTAsync_successData* response) {
auto* task = reinterpret_cast<ConnectFinishedTask*>(context);
(*task)(response, nullptr, nullptr, nullptr);
}
void AbstractMQTTProcessor::connectionSuccess5(void* context, MQTTAsync_successData5* response) {
auto* task = reinterpret_cast<ConnectFinishedTask*>(context);
(*task)(nullptr, response, nullptr, nullptr);
}
void AbstractMQTTProcessor::connectionFailure(void* context, MQTTAsync_failureData* response) {
auto* task = reinterpret_cast<ConnectFinishedTask*>(context);
(*task)(nullptr, nullptr, response, nullptr);
}
void AbstractMQTTProcessor::connectionFailure5(void* context, MQTTAsync_failureData5* response) {
auto* task = reinterpret_cast<ConnectFinishedTask*>(context);
(*task)(nullptr, nullptr, nullptr, response);
}
int AbstractMQTTProcessor::msgReceived(void *context, char* topic_name, int topic_len, MQTTAsync_message* message) {
auto* processor = reinterpret_cast<AbstractMQTTProcessor*>(context);
processor->onMessageReceived(SmartMessage{std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>(message), std::string(topic_name, topic_len)});
MQTTAsync_free(topic_name);
return 1;
}
void AbstractMQTTProcessor::onConnectionLost(char* cause) {
logger_->log_error("Connection lost to MQTT broker {}", uri_);
if (cause != nullptr) {
logger_->log_error("Cause for connection loss: {}", cause);
}
}
void AbstractMQTTProcessor::onConnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5,
MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) {
if (success_data) {
logger_->log_info("Successfully connected to MQTT broker {}", uri_);
startupClient();
return;
}
if (success_data_5) {
logger_->log_info("Successfully connected to MQTT broker {}", uri_);
logger_->log_info("Reason code for connection success: {}: {}", magic_enum::enum_underlying(success_data_5->reasonCode), MQTTReasonCode_toString(success_data_5->reasonCode));
setBrokerLimits(success_data_5);
checkBrokerLimits();
startupClient();
return;
}
if (failure_data) {
logger_->log_error("Connection failed to MQTT broker {} ({})", uri_, failure_data->code);
if (failure_data->message != nullptr) {
logger_->log_error("Detailed reason for connection failure: {}", failure_data->message);
}
return;
}
if (failure_data_5) {
logger_->log_error("Connection failed to MQTT broker {} ({})", uri_, failure_data_5->code);
if (failure_data_5->message != nullptr) {
logger_->log_error("Detailed reason for connection failure: {}", failure_data_5->message);
}
logger_->log_error("Reason code for connection failure: {}: {}", magic_enum::enum_underlying(failure_data_5->reasonCode), MQTTReasonCode_toString(failure_data_5->reasonCode));
}
}
void AbstractMQTTProcessor::onDisconnectFinished(MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5,
MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) {
if (success_data) {
logger_->log_info("Successfully disconnected from MQTT broker {}", uri_);
return;
}
if (success_data_5) {
logger_->log_info("Successfully disconnected from MQTT broker {}", uri_);
logger_->log_info("Reason code for disconnection success: {}: {}", magic_enum::enum_underlying(success_data_5->reasonCode), MQTTReasonCode_toString(success_data_5->reasonCode));
return;
}
if (failure_data) {
logger_->log_error("Disconnection failed from MQTT broker {} ({})", uri_, failure_data->code);
if (failure_data->message != nullptr) {
logger_->log_error("Detailed reason for disconnection failure: {}", failure_data->message);
}
return;
}
if (failure_data_5) {
logger_->log_error("Disconnection failed from MQTT broker {} ({})", uri_, failure_data_5->code);
if (failure_data_5->message != nullptr) {
logger_->log_error("Detailed reason for disconnection failure: {}", failure_data_5->message);
}
logger_->log_error("Reason code for disconnection failure: {}: {}", magic_enum::enum_underlying(failure_data_5->reasonCode), MQTTReasonCode_toString(failure_data_5->reasonCode));
}
}
} // namespace org::apache::nifi::minifi::processors