src/AwsGreengrassV2Sender.cpp (141 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 #include "aws/iotfleetwise/AwsGreengrassV2Sender.h" #include "aws/iotfleetwise/AwsSDKMemoryManager.h" #include "aws/iotfleetwise/IConnectionTypes.h" #include "aws/iotfleetwise/IConnectivityModule.h" #include "aws/iotfleetwise/LoggingModule.h" #include <aws/crt/Optional.h> #include <aws/crt/Types.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> #include <chrono> #include <functional> #include <future> #include <memory> namespace Aws { namespace IoTFleetWise { AwsGreengrassV2Sender::AwsGreengrassV2Sender( IConnectivityModule *connectivityModule, AwsGreengrassCoreIpcClientWrapper &greengrassClientWrapper, const TopicConfig &topicConfig ) : mConnectivityModule( connectivityModule ) , mGreengrassClientWrapper( greengrassClientWrapper ) , mTopicConfig( topicConfig ) { } bool AwsGreengrassV2Sender::isAlive() { std::lock_guard<std::mutex> connectivityLock( mConnectivityMutex ); return isAliveNotThreadSafe(); } bool AwsGreengrassV2Sender::isAliveNotThreadSafe() { if ( mConnectivityModule == nullptr ) { return false; } return mConnectivityModule->isAlive(); } size_t AwsGreengrassV2Sender::getMaxSendSize() const { return AWS_IOT_MAX_MESSAGE_SIZE; } void AwsGreengrassV2Sender::sendBuffer( const std::string &topic, const uint8_t *buf, size_t size, OnDataSentCallback callback, QoS qos ) { std::lock_guard<std::mutex> connectivityLock( mConnectivityMutex ); if ( topic.empty() ) { FWE_LOG_WARN( "Invalid topic provided" ); callback( ConnectivityError::NotConfigured ); return; } if ( ( buf == nullptr ) || ( size == 0 ) ) { FWE_LOG_WARN( "No valid data provided" ); callback( ConnectivityError::WrongInputData ); return; } if ( size > getMaxSendSize() ) { FWE_LOG_WARN( "Payload provided is too long" ); callback( ConnectivityError::WrongInputData ); return; } if ( !isAliveNotThreadSafe() ) { FWE_LOG_WARN( "No alive IPC Connection." ); callback( ConnectivityError::NoConnection ); return; } if ( !AwsSDKMemoryManager::getInstance().reserveMemory( size ) ) { FWE_LOG_ERROR( "Not sending out the message with size " + std::to_string( size ) + " because IoT device SDK allocated the maximum defined memory." ); callback( ConnectivityError::QuotaReached ); return; } ReservedMemoryReleaser reservedMemoryReleaser( AwsSDKMemoryManager::getInstance(), size ); auto greengrassPublishQoS = Aws::Greengrass::QOS_AT_MOST_ONCE; switch ( qos ) { case QoS::AT_MOST_ONCE: greengrassPublishQoS = Aws::Greengrass::QOS_AT_MOST_ONCE; break; case QoS::AT_LEAST_ONCE: greengrassPublishQoS = Aws::Greengrass::QOS_AT_LEAST_ONCE; break; } auto publishOperation = mGreengrassClientWrapper.NewPublishToIoTCore(); Aws::Greengrass::PublishToIoTCoreRequest publishRequest; publishRequest.SetTopicName( topic.c_str() != nullptr ? topic.c_str() : "" ); Aws::Crt::Vector<uint8_t> payload( buf, buf + size ); publishRequest.SetPayload( payload ); publishRequest.SetQos( greengrassPublishQoS ); FWE_LOG_TRACE( "Attempting to publish to " + topic + " topic" ); auto requestStatus = publishOperation->Activate( publishRequest ).get(); if ( !requestStatus ) { auto errString = requestStatus.StatusToString(); FWE_LOG_ERROR( "Failed to publish to " + topic + " topic with error " + std::string( errString.c_str() != nullptr ? errString.c_str() : "Unknown error" ) ); callback( ConnectivityError::NoConnection ); return; } auto publishResultFuture = publishOperation->GetResult(); // To avoid throwing exceptions, wait on the result for a specified timeout: if ( publishResultFuture.wait_for( std::chrono::seconds( 10 ) ) == std::future_status::timeout ) { FWE_LOG_ERROR( "Timed out while waiting for response from Greengrass Core" ); callback( ConnectivityError::NoConnection ); return; } auto publishResult = publishResultFuture.get(); if ( !publishResult ) { FWE_LOG_ERROR( "Failed to publish to " + topic + " topic" ); auto errorType = publishResult.GetResultType(); if ( errorType == OPERATION_ERROR ) { OperationError *error = publishResult.GetOperationError(); /* * This pointer can be casted to any error type like so: * if(error->GetModelName() == UnauthorizedError::MODEL_NAME) * UnauthorizedError *unauthorizedError = static_cast<UnauthorizedError*>(error); */ if ( error->GetMessage().has_value() ) { auto errString = error->GetMessage().value(); FWE_LOG_ERROR( "Greengrass Core responded with an error: " + std::string( errString.c_str() ) ); } } else { auto errString = std::string( publishResult.GetRpcError().StatusToString().c_str() ); FWE_LOG_ERROR( "Attempting to receive the response from the server failed with error code " + errString ); } callback( ConnectivityError::TransmissionError ); return; } mPayloadCountSent++; callback( ConnectivityError::Success ); } } // namespace IoTFleetWise } // namespace Aws