src/AwsIotSender.cpp (128 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 #include "aws/iotfleetwise/AwsIotSender.h" #include "aws/iotfleetwise/AwsSDKMemoryManager.h" #include "aws/iotfleetwise/IConnectionTypes.h" #include "aws/iotfleetwise/IConnectivityModule.h" #include "aws/iotfleetwise/LoggingModule.h" #include <aws/crt/Api.h> #include <aws/crt/Types.h> #include <aws/crt/mqtt/Mqtt5Packets.h> #include <functional> namespace Aws { namespace IoTFleetWise { AwsIotSender::AwsIotSender( const IConnectivityModule *connectivityModule, MqttClientWrapper &mqttClient, const TopicConfig &topicConfig ) : mConnectivityModule( connectivityModule ) , mMqttClient( mqttClient ) , mTopicConfig( topicConfig ) { } bool AwsIotSender::isAlive() { std::lock_guard<std::mutex> connectivityLock( mConnectivityMutex ); return isAliveNotThreadSafe(); } bool AwsIotSender::isAliveNotThreadSafe() { if ( mConnectivityModule == nullptr ) { return false; } return mConnectivityModule->isAlive(); } size_t AwsIotSender::getMaxSendSize() const { return AWS_IOT_MAX_MESSAGE_SIZE; } void AwsIotSender::sendBuffer( const std::string &topic, const uint8_t *buf, size_t size, OnDataSentCallback callback, QoS qos ) { std::lock_guard<std::mutex> connectivityLock( mConnectivityMutex ); if ( topic.empty() ) { FWE_LOG_WARN( "Invalid topic provided" ); callback( ConnectivityError::NotConfigured ); return; } if ( ( buf == nullptr ) || ( size == 0 ) ) { FWE_LOG_WARN( "No valid data provided" ); callback( ConnectivityError::WrongInputData ); return; } if ( size > getMaxSendSize() ) { FWE_LOG_WARN( "Payload provided is too long" ); callback( ConnectivityError::WrongInputData ); return; } if ( !isAliveNotThreadSafe() ) { callback( ConnectivityError::NoConnection ); return; } if ( !AwsSDKMemoryManager::getInstance().reserveMemory( size ) ) { FWE_LOG_ERROR( "Not sending out the message with size " + std::to_string( size ) + " because IoT device SDK allocated the maximum defined memory." ); callback( ConnectivityError::QuotaReached ); return; } auto sdkQos = Aws::Crt::Mqtt5::QOS::AWS_MQTT5_QOS_AT_MOST_ONCE; switch ( qos ) { case QoS::AT_MOST_ONCE: sdkQos = Aws::Crt::Mqtt5::QOS::AWS_MQTT5_QOS_AT_MOST_ONCE; break; case QoS::AT_LEAST_ONCE: sdkQos = Aws::Crt::Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE; break; } publishMessageToTopic( topic, buf, size, callback, sdkQos ); } void AwsIotSender::publishMessageToTopic( const std::string &topic, const uint8_t *buf, size_t size, OnDataSentCallback callback, Aws::Crt::Mqtt5::QOS qos ) { auto payload = Aws::Crt::ByteBufFromArray( buf, size ); auto onPublishComplete = // coverity[autosar_cpp14_a8_4_11_violation] smart pointer needed to match the expected signature [size, callback, this]( int errorCode, std::shared_ptr<Aws::Crt::Mqtt5::PublishResult> result ) mutable { { AwsSDKMemoryManager::getInstance().releaseReservedMemory( size ); } if ( !result->wasSuccessful() ) { auto errorString = Aws::Crt::ErrorDebugString( errorCode ); auto logMessage = "Publish failed with error: " + ( errorString != nullptr ? std::string( errorString ) : std::string( "Unknown error" ) ); if ( errorCode == AWS_ERROR_MQTT5_USER_REQUESTED_STOP ) { FWE_LOG_TRACE( logMessage ); } else { FWE_LOG_ERROR( logMessage ); } callback( ConnectivityError::TransmissionError ); return; } FWE_LOG_TRACE( "Publish succeeded" ); mPayloadCountSent++; callback( ConnectivityError::Success ); }; std::shared_ptr<Aws::Crt::Mqtt5::PublishPacket> publishPacket = std::make_shared<Aws::Crt::Mqtt5::PublishPacket>( topic.c_str(), Aws::Crt::ByteCursorFromByteBuf( payload ), qos ); if ( !mMqttClient.Publish( publishPacket, onPublishComplete ) ) { callback( ConnectivityError::TransmissionError ); } } } // namespace IoTFleetWise } // namespace Aws