src/ActuatorCommandManager.cpp (247 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#include "aws/iotfleetwise/ActuatorCommandManager.h"
#include "aws/iotfleetwise/Assert.h"
#include "aws/iotfleetwise/CollectionInspectionAPITypes.h"
#include "aws/iotfleetwise/ICommandDispatcher.h"
#include "aws/iotfleetwise/LoggingModule.h"
#include "aws/iotfleetwise/QueueTypes.h"
#include "aws/iotfleetwise/TraceModule.h"
#include <string>
#include <unordered_map>
#include <utility>
namespace Aws
{
namespace IoTFleetWise
{
ActuatorCommandManager::ActuatorCommandManager( std::shared_ptr<DataSenderQueue> commandResponses,
size_t maxConcurrentCommandRequests,
RawData::BufferManager *rawDataBufferManager )
: mMaxConcurrentCommandRequests( maxConcurrentCommandRequests )
, mCommandResponses( std::move( commandResponses ) )
, mRawDataBufferManager( rawDataBufferManager )
{
}
void
ActuatorCommandManager::onReceivingCommandRequest( const ActuatorCommandRequest &commandRequest )
{
std::lock_guard<std::mutex> lock( mCommandRequestMutex );
if ( mCommandRequestQueue.size() >= mMaxConcurrentCommandRequests )
{
FWE_LOG_ERROR( "Command Requests processing queue is full, could not ingest command request" );
return;
}
mCommandRequestQueue.push( commandRequest );
TraceModule::get().incrementAtomicVariable( TraceAtomicVariable::QUEUE_PENDING_COMMAND_REQUESTS );
FWE_LOG_TRACE( "New command request was handed over" );
mWait.notify();
}
bool
ActuatorCommandManager::registerDispatcher( const std::string &interfaceId,
std::shared_ptr<ICommandDispatcher> dispatcher )
{
auto registered = mInterfaceIDToCommandDispatcherMap.emplace( interfaceId, std::move( dispatcher ) ).second;
if ( !registered )
{
FWE_LOG_ERROR( "Dispatcher for interface ID " + interfaceId + " already registered" );
}
return registered;
}
std::unordered_map<InterfaceID, std::vector<std::string>>
ActuatorCommandManager::getActuatorNames()
{
std::unordered_map<InterfaceID, std::vector<std::string>> actuatorNames;
for ( const auto &interface : mInterfaceIDToCommandDispatcherMap )
{
actuatorNames.emplace( interface.first, interface.second->getActuatorNames() );
}
return actuatorNames;
}
bool
ActuatorCommandManager::start()
{
if ( mCommandResponses == nullptr )
{
FWE_LOG_ERROR( "No queue provided for the Command Responses" );
return false;
}
// Prevent concurrent stop/init
std::lock_guard<std::mutex> lock( mThreadMutex );
mShouldStop.store( false );
if ( !mThread.create( [this]() {
this->doWork();
} ) )
{
FWE_LOG_TRACE( "Command Manager Thread failed to start" );
}
else
{
FWE_LOG_TRACE( "Command Manager Thread started" );
mThread.setThreadName( "fwRCCommandMng" );
}
return mThread.isActive() && mThread.isValid();
}
void
ActuatorCommandManager::doWork()
{
// Initialize dispatchers on this thread to avoid delaying bootstrap. Commands may be received
// directly after connection to the cloud, which will be queued until this initialization loop
// has completed.
for ( auto interfaceDispatcherIt = mInterfaceIDToCommandDispatcherMap.begin();
( interfaceDispatcherIt != mInterfaceIDToCommandDispatcherMap.end() ) && ( !shouldStop() );
interfaceDispatcherIt++ )
{
FWE_GRACEFUL_FATAL_ASSERT( interfaceDispatcherIt->second->init(), "Fatal error initializing dispatcher", );
}
while ( !shouldStop() )
{
mWait.wait( Signal::WaitWithPredicate );
while ( !mCommandRequestQueue.empty() )
{
auto &commandRequest = mCommandRequestQueue.front();
processCommandRequest( commandRequest );
mCommandRequestQueue.pop();
TraceModule::get().decrementAtomicVariable( TraceAtomicVariable::QUEUE_PENDING_COMMAND_REQUESTS );
}
}
}
void
ActuatorCommandManager::processCommandRequest( const ActuatorCommandRequest &commandRequest )
{
FWE_LOG_TRACE( "Processing Command Request with ID: " + commandRequest.commandID );
if ( commandRequest.decoderID != mCurrentDecoderManifestID )
{
FWE_LOG_ERROR( "Decoder manifest sync id does not match with the decoder manifest used by the agent, cannot "
"process Command with ID " +
commandRequest.commandID );
queueCommandResponse( commandRequest,
CommandStatus::EXECUTION_FAILED,
REASON_CODE_DECODER_MANIFEST_OUT_OF_SYNC,
commandRequest.decoderID + " vs " + mCurrentDecoderManifestID );
return;
}
if ( mCustomSignalDecoderFormatMap == nullptr )
{
FWE_LOG_ERROR( "No Custom Signal Decoder Format map was provided, cannot process Command with ID " +
commandRequest.commandID );
queueCommandResponse( commandRequest,
CommandStatus::EXECUTION_FAILED,
REASON_CODE_DECODER_MANIFEST_OUT_OF_SYNC,
"No decoder manifest" );
return;
}
// Retrieve Custom Decoder ID from the decoder manifest
auto customSignalDecoderFormatIt = mCustomSignalDecoderFormatMap->find( commandRequest.signalID );
if ( customSignalDecoderFormatIt == mCustomSignalDecoderFormatMap->end() )
{
FWE_LOG_ERROR( "Command Signal Decoder Format not found for signal ID " +
std::to_string( commandRequest.signalID ) + ". Command with ID " + commandRequest.commandID +
" can not be processed." );
queueCommandResponse(
commandRequest, CommandStatus::EXECUTION_FAILED, REASON_CODE_NO_DECODING_RULES_FOUND, "" );
return;
}
const auto &interfaceId = customSignalDecoderFormatIt->second.mInterfaceId;
auto commandDispatcherIt = mInterfaceIDToCommandDispatcherMap.find( interfaceId );
if ( commandDispatcherIt == mInterfaceIDToCommandDispatcherMap.end() )
{
FWE_LOG_ERROR( "No command dispatcher found for signal ID " + std::to_string( commandRequest.signalID ) +
", interface ID " + interfaceId + ". Command with ID " + commandRequest.commandID +
" can not be processed." );
queueCommandResponse(
commandRequest, CommandStatus::EXECUTION_FAILED, REASON_CODE_NO_COMMAND_DISPATCHER_FOUND, "" );
return;
}
const auto &actuatorName = customSignalDecoderFormatIt->second.mDecoder;
// Timeout is already checked during command reception from the cloud, but check again here in case
// command dispatching is being run synchronously and a large delay has occurred since the command
// was received.
if ( ( commandRequest.executionTimeoutMs > 0 ) &&
( ( commandRequest.issuedTimestampMs + commandRequest.executionTimeoutMs ) <=
mClock->systemTimeSinceEpochMs() ) )
{
FWE_LOG_ERROR( "Command Request with ID " + commandRequest.commandID + " timed out" );
queueCommandResponse(
commandRequest, CommandStatus::EXECUTION_TIMEOUT, REASON_CODE_TIMED_OUT_BEFORE_DISPATCH, "" );
return;
}
// Send command request to the command dispatcher
commandDispatcherIt->second->setActuatorValue(
actuatorName,
commandRequest.signalValueWrapper,
commandRequest.commandID,
commandRequest.issuedTimestampMs,
commandRequest.executionTimeoutMs,
[this, commandRequest](
CommandStatus status, CommandReasonCode reasonCode, const CommandReasonDescription &reasonDescription ) {
queueCommandResponse( commandRequest, status, reasonCode, reasonDescription );
} );
}
void
ActuatorCommandManager::queueCommandResponse( const ActuatorCommandRequest &commandRequest,
CommandStatus commandStatus,
CommandReasonCode reasonCode,
const CommandReasonDescription &reasonDescription )
{
if ( commandRequest.signalValueWrapper.type == SignalType::STRING )
{
mRawDataBufferManager->decreaseHandleUsageHint( commandRequest.signalValueWrapper.value.rawDataVal.signalId,
commandRequest.signalValueWrapper.value.rawDataVal.handle,
RawData::BufferHandleUsageStage::UPLOADING );
}
// Emit metrics for command execution
if ( reasonCode == REASON_CODE_PRECONDITION_FAILED )
{
TraceModule::get().incrementVariable( TraceVariable::COMMAND_PRECONDITION_CHECK_FAILURE );
}
else if ( reasonCode == REASON_CODE_DECODER_MANIFEST_OUT_OF_SYNC )
{
TraceModule::get().incrementVariable( TraceVariable::COMMAND_DECODER_MANIFEST_FAILURE );
}
else if ( commandStatus == CommandStatus::EXECUTION_TIMEOUT )
{
TraceModule::get().incrementVariable( TraceVariable::COMMAND_EXECUTION_TIMEOUT );
}
else if ( commandStatus == CommandStatus::EXECUTION_FAILED )
{
TraceModule::get().incrementVariable( TraceVariable::COMMAND_EXECUTION_FAILURE );
}
else
{
// Do nothing
}
// coverity[check_return]
mCommandResponses->push(
std::make_shared<CommandResponse>( commandRequest.commandID, commandStatus, reasonCode, reasonDescription ) );
}
void
ActuatorCommandManager::onChangeOfCustomSignalDecoderFormatMap(
const SyncID ¤tDecoderManifestID,
const SignalIDToCustomSignalDecoderFormatMapPtr &customSignalDecoderFormatMap )
{
std::lock_guard<std::mutex> lock( mCustomSignalDecoderFormatMapUpdateMutex );
mCustomSignalDecoderFormatMap = customSignalDecoderFormatMap;
mCurrentDecoderManifestID = currentDecoderManifestID;
FWE_LOG_TRACE( "Custom Signal Decoder Format Map was handed over to the Command Manager" );
}
bool
ActuatorCommandManager::shouldStop() const
{
return mShouldStop.load( std::memory_order_relaxed );
}
bool
ActuatorCommandManager::stop()
{
if ( ( !mThread.isValid() ) || ( !mThread.isActive() ) )
{
return true;
}
std::lock_guard<std::mutex> lock( mThreadMutex );
mShouldStop.store( true, std::memory_order_relaxed );
FWE_LOG_TRACE( "Request stop" );
mWait.notify();
mThread.release();
FWE_LOG_TRACE( "Stop finished" );
mShouldStop.store( false, std::memory_order_relaxed );
return !mThread.isActive();
}
bool
ActuatorCommandManager::isAlive()
{
return mThread.isValid() && mThread.isActive();
}
ActuatorCommandManager::~ActuatorCommandManager()
{
// To make sure the thread stops during teardown of tests.
if ( isAlive() )
{
stop();
}
}
} // namespace IoTFleetWise
} // namespace Aws