src/IoTFleetWiseEngine.cpp (1,716 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 #include "aws/iotfleetwise/IoTFleetWiseEngine.h" #include "aws/iotfleetwise/AwsBootstrap.h" #include "aws/iotfleetwise/AwsIotConnectivityModule.h" #include "aws/iotfleetwise/AwsSDKMemoryManager.h" #include "aws/iotfleetwise/CollectionInspectionAPITypes.h" #include "aws/iotfleetwise/ConsoleLogger.h" #include "aws/iotfleetwise/DataSenderManager.h" #include "aws/iotfleetwise/DataSenderProtoWriter.h" #include "aws/iotfleetwise/DataSenderTypes.h" #include "aws/iotfleetwise/ILogger.h" #include "aws/iotfleetwise/IoTFleetWiseVersion.h" #include "aws/iotfleetwise/LogLevel.h" #include "aws/iotfleetwise/LoggingModule.h" #include "aws/iotfleetwise/MqttClientWrapper.h" #include "aws/iotfleetwise/QueueTypes.h" #include "aws/iotfleetwise/TelemetryDataSender.h" #include "aws/iotfleetwise/TraceModule.h" #include <algorithm> #include <aws/crt/Api.h> #include <aws/crt/Types.h> #include <aws/iot/Mqtt5Client.h> #include <boost/optional/optional.hpp> #include <cstdlib> #include <exception> #include <fstream> #include <functional> #include <memory> #include <unordered_map> #include <utility> #ifdef FWE_FEATURE_GREENGRASSV2 #include "aws/iotfleetwise/AwsGreengrassCoreIpcClientWrapper.h" #include "aws/iotfleetwise/AwsGreengrassV2ConnectivityModule.h" #include <aws/greengrass/GreengrassCoreIpcClient.h> #ifdef FWE_FEATURE_S3 #include <aws/core/auth/AWSCredentialsProviderChain.h> #endif #endif #ifdef FWE_FEATURE_S3 #include "aws/iotfleetwise/Credentials.h" #include "aws/iotfleetwise/TransferManagerWrapper.h" // IWYU pragma: keep #include <aws/core/client/ClientConfiguration.h> // IWYU pragma: keep #include <aws/s3/S3Client.h> // IWYU pragma: keep #include <aws/s3/S3ServiceClientModel.h> // IWYU pragma: keep #include <aws/transfer/TransferManager.h> // IWYU pragma: keep #endif #ifdef FWE_FEATURE_VISION_SYSTEM_DATA #include "aws/iotfleetwise/DataSenderIonWriter.h" #include "aws/iotfleetwise/VisionSystemDataSender.h" #endif #ifdef FWE_FEATURE_REMOTE_COMMANDS #include "aws/iotfleetwise/CommandResponseDataSender.h" #include "aws/iotfleetwise/SignalTypes.h" #endif #ifdef FWE_FEATURE_SOMEIP #include "aws/iotfleetwise/ExampleSomeipInterfaceWrapper.h" #include "v1/commonapi/DeviceShadowOverSomeipInterface.hpp" #include "v1/commonapi/ExampleSomeipInterfaceProxy.hpp" #include <CommonAPI/CommonAPI.hpp> #include <vsomeip/vsomeip.hpp> #endif #ifdef FWE_FEATURE_LAST_KNOWN_STATE #include "aws/iotfleetwise/LastKnownStateDataSender.h" #include "aws/iotfleetwise/LastKnownStateInspector.h" #endif #ifdef FWE_FEATURE_STORE_AND_FORWARD #include "aws/iotfleetwise/snf/RateLimiter.h" #include "aws/iotfleetwise/snf/StreamForwarder.h" #endif #ifdef FWE_FEATURE_CUSTOM_FUNCTION_EXAMPLES #include "aws/iotfleetwise/CustomFunctionMath.h" #endif #ifdef FWE_FEATURE_UDS_DTC_EXAMPLE #include <stdexcept> #endif #ifdef FWE_FEATURE_SCRIPT_ENGINE #include <aws/s3/model/GetObjectRequest.h> #endif extern "C" { // coverity[cert_msc54_cpp_violation] False positive - function does have C linkage static void signalHandler( int signum ) { // Very few things are safe in a signal handler. So we never do anything other than set the atomic int, not even // print a message: https://stackoverflow.com/a/16891799 Aws::IoTFleetWise::gSignal = signum; } // coverity[cert_msc54_cpp_violation] False positive - function does have C linkage static void segFaultHandler( int signum ) { static_cast<void>( signum ); // SIGSEGV handlers should never return. We have to abort: // https://wiki.sei.cmu.edu/confluence/display/c/SIG35-C.+Do+not+return+from+a+computational+exception+signal+handler // coverity[autosar_cpp14_m18_0_3_violation] // coverity[misra_cpp_2008_rule_18_0_3_violation] abort(); } // coverity[cert_msc54_cpp_violation] False positive - function does have C linkage static void abortHandler( int signum ) { static_cast<void>( signum ); // Very few things are safe in a signal handler. Flushing streams isn't normally safe, // unless we can guarantee that nothing is currently using the stream: // https://www.gnu.org/software/libc/manual/html_node/Nonreentrancy.html // So we use an atomic int (signal handler safe) to check whether the program stopped while // in the middle of a log call. Assuming that we are not using the log stream (stdout) // directly anywhere else, flushing should be safe here. if ( Aws::IoTFleetWise::gOngoingLogMessage == 0 ) { Aws::IoTFleetWise::LoggingModule::flush(); } } } namespace Aws { namespace IoTFleetWise { // coverity[autosar_cpp14_a2_11_1_violation] volatile required as it will be modified by a signal handler volatile sig_atomic_t gSignal = 0; // NOLINT Global signal static constexpr uint64_t DEFAULT_RETRY_UPLOAD_PERSISTED_INTERVAL_MS = 10000; static constexpr uint64_t DEFAULT_FETCH_QUEUE_SIZE = 1000; static const std::string CAN_INTERFACE_TYPE = "canInterface"; static const std::string EXTERNAL_CAN_INTERFACE_TYPE = "externalCanInterface"; static const std::string OBD_INTERFACE_TYPE = "obdInterface"; static const std::string NAMED_SIGNAL_INTERFACE_TYPE = "namedSignalInterface"; #ifdef FWE_FEATURE_ROS2 static const std::string ROS2_INTERFACE_TYPE = "ros2Interface"; #endif #ifdef FWE_FEATURE_SOMEIP static const std::string SOMEIP_TO_CAN_BRIDGE_INTERFACE_TYPE = "someipToCanBridgeInterface"; static const std::string SOMEIP_COLLECTION_INTERFACE_TYPE = "someipCollectionInterface"; #endif #ifdef FWE_FEATURE_REMOTE_COMMANDS #ifdef FWE_FEATURE_SOMEIP static const std::string SOMEIP_COMMAND_INTERFACE_TYPE = "someipCommandInterface"; #endif static const std::string CAN_COMMAND_INTERFACE_TYPE = "canCommandInterface"; static const std::unordered_map<std::string, CanCommandDispatcher::CommandConfig> EXAMPLE_CAN_INTERFACE_SUPPORTED_ACTUATOR_MAP = { { "Vehicle.actuator6", { 0x00000123, 0x00000456, SignalType::INT32 } }, { "Vehicle.actuator7", { 0x80000789, 0x80000ABC, SignalType::DOUBLE } }, }; #endif #ifdef FWE_FEATURE_IWAVE_GPS static const std::string IWAVE_GPS_INTERFACE_TYPE = "iWaveGpsInterface"; #endif #ifdef FWE_FEATURE_EXTERNAL_GPS static const std::string EXTERNAL_GPS_INTERFACE_TYPE = "externalGpsInterface"; #endif #ifdef FWE_FEATURE_AAOS_VHAL static const std::string AAOS_VHAL_INTERFACE_TYPE = "aaosVhalInterface"; #endif #ifdef FWE_FEATURE_UDS_DTC_EXAMPLE static const std::string UDS_DTC_INTERFACE = "exampleUDSInterface"; #endif namespace { /** * @brief Get the File Contents including whitespace characters * * @param p The file path * @return std::string File contents */ std::string getFileContents( const std::string &p ) { constexpr auto NUM_CHARS = 1; std::string ret; std::ifstream fs{ p }; // False alarm: uninit_use_in_call: Using uninitialized value "fs._M_streambuf_state" when calling "good". // coverity[uninit_use_in_call : SUPPRESS] while ( fs.good() ) { auto c = static_cast<char>( fs.get() ); ret.append( NUM_CHARS, c ); } return ret; } /** * @brief Get the absolute file path, if the path is already absolute its returned. * * @param p The file path * @param basePath Base path to which the p is relative * @return boost::filesystem::path Absolute file path */ boost::filesystem::path getAbsolutePath( const std::string &p, const boost::filesystem::path &basePath ) { boost::filesystem::path filePath( p ); if ( !filePath.is_absolute() ) { return basePath / filePath; } return filePath; } } // namespace void IoTFleetWiseEngine::configureSignalHandlers() { signal( SIGINT, signalHandler ); signal( SIGTERM, signalHandler ); signal( SIGUSR1, signalHandler ); signal( SIGSEGV, segFaultHandler ); // Mainly to handle when a thread is terminated due to uncaught exception signal( SIGABRT, abortHandler ); // Ignore SIGPIPE, as per // https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/basic-use.html#sdk-setting-options // coverity[misra_cpp_2008_rule_5_2_9_violation] Using SIG_IGN is the standard method to ignore signals // coverity[autosar_cpp14_m5_2_9_violation] Using SIG_IGN is the standard method to ignore signals signal( SIGPIPE, SIG_IGN ); // NOLINT } std::string IoTFleetWiseEngine::getVersion() { return "FWE Version: " + std::string( &FWE_VERSION_PROJECT_VERSION[0] ) + ", git tag: " + std::string( &FWE_VERSION_GIT_TAG[0] ) + ", git commit sha: " + std::string( &FWE_VERSION_GIT_COMMIT_SHA[0] ) + ", Build time: " + std::string( &FWE_VERSION_BUILD_TIME[0] ); } void IoTFleetWiseEngine::configureLogging( const Json::Value &config ) { auto logLevel = LogLevel::Trace; stringToLogLevel( config["staticConfig"]["internalParameters"]["systemWideLogLevel"].asString(), logLevel ); gSystemWideLogLevel = logLevel; auto logColorOption = LogColorOption::Auto; if ( config["staticConfig"]["internalParameters"].isMember( "logColor" ) ) { std::string logColorConfig = config["staticConfig"]["internalParameters"]["logColor"].asString(); if ( !stringToLogColorOption( logColorConfig, logColorOption ) ) { FWE_LOG_ERROR( "Invalid logColor config: " + logColorConfig ); } } gLogColorOption = logColorOption; } int IoTFleetWiseEngine::signalToExitCode( int signalNumber ) { switch ( signalNumber ) { case SIGUSR1: FWE_LOG_ERROR( "Fatal error, stopping" ); return -1; case SIGINT: case SIGTERM: FWE_LOG_INFO( "Stopping" ); return 0; default: FWE_LOG_WARN( "Received unexpected signal " + std::to_string( signalNumber ) ); return 0; } } #ifdef FWE_FEATURE_S3 std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> IoTFleetWiseEngine::getTransferManagerExecutor() { std::lock_guard<std::mutex> lock( mTransferManagerExecutorMutex ); if ( mTransferManagerExecutor == nullptr ) { mTransferManagerExecutor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>( "executor", 25 ); } return mTransferManagerExecutor; } #endif IoTFleetWiseEngine::IoTFleetWiseEngine() { TraceModule::get().sectionBegin( TraceSection::FWE_STARTUP ); } IoTFleetWiseEngine::~IoTFleetWiseEngine() { // To make sure the thread stops during teardown of tests. if ( isAlive() ) { stop(); } setLogForwarding( nullptr ); } #ifdef FWE_FEATURE_SOMEIP static std::shared_ptr<ExampleSomeipInterfaceWrapper> createExampleSomeipInterfaceWrapper( const std::string &applicationName, const std::string &exampleInstance, RawData::BufferManager *rawDataBufferManager, bool subscribeToLongRunningCommandStatus ) { return std::make_shared<ExampleSomeipInterfaceWrapper>( "local", exampleInstance, applicationName, []( std::string domain, std::string instance, std::string connection ) -> std::shared_ptr<v1::commonapi::ExampleSomeipInterfaceProxy<>> { return CommonAPI::Runtime::get()->buildProxy<v1::commonapi::ExampleSomeipInterfaceProxy>( domain, instance, connection ); }, rawDataBufferManager, subscribeToLongRunningCommandStatus ); } #endif bool IoTFleetWiseEngine::connect( const Json::Value &jsonConfig, const boost::filesystem::path &configFileDirectoryPath ) { // Main bootstrap sequence. try { IoTFleetWiseConfig config( jsonConfig ); uint64_t persistencyUploadRetryIntervalMs = 0; if ( ( config["staticConfig"].isMember( "persistency" ) ) ) { const auto persistencyPath = config["staticConfig"]["persistency"]["persistencyPath"].asStringRequired(); /*************************Payload Manager and Persistency library bootstrap begin*********/ // Create an object for Persistency mCacheAndPersist = std::make_shared<CacheAndPersist>( getAbsolutePath( persistencyPath, configFileDirectoryPath ).string(), config["staticConfig"]["persistency"]["persistencyPartitionMaxSize"].asSizeRequired() ); if ( !mCacheAndPersist->init() ) { FWE_LOG_ERROR( "Failed to init persistency library" ); } persistencyUploadRetryIntervalMs = config["staticConfig"]["persistency"]["persistencyUploadRetryIntervalMs"].asU64Optional().get_value_or( DEFAULT_RETRY_UPLOAD_PERSISTED_INTERVAL_MS ); // Payload Manager for offline data management mPayloadManager = std::make_shared<PayloadManager>( mCacheAndPersist ); } else { FWE_LOG_INFO( "Persistency feature is disabled in the configuration." ); #ifdef FWE_FEATURE_STORE_AND_FORWARD FWE_LOG_INFO( "Disabling Store and Forward feature as persistency is disabled." ); mStoreAndForwardEnabled = false; #endif } /*************************Payload Manager and Persistency library bootstrap end************/ /*************************CAN InterfaceID to InternalID Translator begin*********/ for ( unsigned i = 0; i < config["networkInterfaces"].getArraySizeRequired(); i++ ) { auto networkInterface = config["networkInterfaces"][i]; auto networkInterfaceType = networkInterface["type"].asStringRequired(); if ( ( networkInterfaceType == CAN_INTERFACE_TYPE ) || ( networkInterfaceType == EXTERNAL_CAN_INTERFACE_TYPE ) #ifdef FWE_FEATURE_SOMEIP || ( networkInterfaceType == SOMEIP_TO_CAN_BRIDGE_INTERFACE_TYPE ) #endif ) { mCANIDTranslator.add( networkInterface["interfaceId"].asStringRequired() ); } } /*************************CAN InterfaceID to InternalID Translator end*********/ /**************************Connectivity bootstrap begin*******************************/ // Pass on the AWS SDK Bootstrap handle to the IoTModule. auto awsSdkLogLevel = AwsBootstrap::logLevelFromString( config["staticConfig"]["internalParameters"]["awsSdkLogLevel"].asStringOptional().get_value_or( "Warn" ) ); auto bootstrapPtr = AwsBootstrap::getInstance( awsSdkLogLevel ).getClientBootStrap(); std::size_t maxAwsSdkHeapMemoryBytes = 0U; if ( config["staticConfig"]["internalParameters"].isMember( "maximumAwsSdkHeapMemoryBytes" ) ) { maxAwsSdkHeapMemoryBytes = config["staticConfig"]["internalParameters"]["maximumAwsSdkHeapMemoryBytes"].asSizeRequired(); if ( ( maxAwsSdkHeapMemoryBytes != 0U ) && AwsSDKMemoryManager::getInstance().setLimit( maxAwsSdkHeapMemoryBytes ) ) { FWE_LOG_INFO( "Maximum AWS SDK Heap Memory Bytes has been configured:" + std::to_string( maxAwsSdkHeapMemoryBytes ) ); } else { FWE_LOG_TRACE( "Maximum AWS SDK Heap Memory Bytes will use default value" ); } } else { FWE_LOG_TRACE( "Maximum AWS SDK Heap Memory Bytes will use default value" ); } auto mqttConfig = config["staticConfig"]["mqttConnection"]; auto clientId = mqttConfig["clientId"].asStringRequired(); std::string connectionType = mqttConfig["connectionType"].asStringOptional().get_value_or( "iotCore" ); TopicConfigArgs topicConfigArgs; topicConfigArgs.iotFleetWisePrefix = mqttConfig["iotFleetWiseTopicPrefix"].asStringOptional(); topicConfigArgs.commandsPrefix = mqttConfig["commandsTopicPrefix"].asStringOptional(); topicConfigArgs.deviceShadowPrefix = mqttConfig["deviceShadowTopicPrefix"].asStringOptional(); topicConfigArgs.jobsPrefix = mqttConfig["jobsTopicPrefix"].asStringOptional(); topicConfigArgs.metricsTopic = mqttConfig["metricsUploadTopic"].asStringOptional().get_value_or( "" ); topicConfigArgs.logsTopic = mqttConfig["loggingUploadTopic"].asStringOptional().get_value_or( "" ); mTopicConfig = std::make_unique<TopicConfig>( clientId, topicConfigArgs ); if ( connectionType == "iotCore" ) { std::string privateKey; std::string certificate; std::string rootCA; FWE_LOG_INFO( "ConnectionType is iotCore" ); // fetch connection parameters from config if ( mqttConfig.isMember( "privateKey" ) ) { privateKey = mqttConfig["privateKey"].asStringRequired(); } else if ( mqttConfig.isMember( "privateKeyFilename" ) ) { auto privKeyPathAbs = getAbsolutePath( mqttConfig["privateKeyFilename"].asStringRequired(), configFileDirectoryPath ) .string(); privateKey = getFileContents( privKeyPathAbs ); } if ( mqttConfig.isMember( "certificate" ) ) { certificate = mqttConfig["certificate"].asStringRequired(); } else if ( mqttConfig.isMember( "certificateFilename" ) ) { auto certPathAbs = getAbsolutePath( mqttConfig["certificateFilename"].asStringRequired(), configFileDirectoryPath ) .string(); certificate = getFileContents( certPathAbs ); } if ( mqttConfig.isMember( "rootCA" ) ) { rootCA = mqttConfig["rootCA"].asStringRequired(); } else if ( mqttConfig.isMember( "rootCAFilename" ) ) { auto rootCAPathAbs = getAbsolutePath( mqttConfig["rootCAFilename"].asStringRequired(), configFileDirectoryPath ) .string(); rootCA = getFileContents( rootCAPathAbs ); } // coverity[autosar_cpp14_a20_8_5_violation] - can't use make_unique as the constructor is private auto builder = std::unique_ptr<Aws::Iot::Mqtt5ClientBuilder>( Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromMemory( mqttConfig["endpointUrl"].asStringRequired().c_str(), Crt::ByteCursorFromCString( certificate.c_str() ), Crt::ByteCursorFromCString( privateKey.c_str() ) ) ); if ( builder == nullptr ) { FWE_LOG_ERROR( "Failed to setup mqtt5 client builder with error code " + std::to_string( Aws::Crt::LastError() ) + ": " + Aws::Crt::ErrorDebugString( Aws::Crt::LastError() ) ); return false; } else { builder->WithBootstrap( bootstrapPtr ); mBuilderWrapper = std::make_unique<MqttClientBuilderWrapper>( std::move( builder ) ); } AwsIotConnectivityConfig mqttConnectionConfig; mqttConnectionConfig.keepAliveIntervalSeconds = static_cast<uint16_t>( mqttConfig["keepAliveIntervalSeconds"].asU32Optional().get_value_or( MQTT_KEEP_ALIVE_INTERVAL_SECONDS ) ); mqttConnectionConfig.pingTimeoutMs = mqttConfig["pingTimeoutMs"].asU32Optional().get_value_or( MQTT_PING_TIMEOUT_MS ); mqttConnectionConfig.sessionExpiryIntervalSeconds = mqttConfig["sessionExpiryIntervalSeconds"].asU32Optional().get_value_or( MQTT_SESSION_EXPIRY_INTERVAL_SECONDS ); mConnectivityModule = std::make_shared<AwsIotConnectivityModule>( rootCA, clientId, *mBuilderWrapper, *mTopicConfig, mqttConnectionConfig ); #ifdef FWE_FEATURE_S3 if ( config["staticConfig"].isMember( "credentialsProvider" ) ) { auto crtCredentialsProvider = createX509CredentialsProvider( bootstrapPtr, clientId, privateKey, certificate, config["staticConfig"]["credentialsProvider"]["endpointUrl"].asStringRequired(), config["staticConfig"]["credentialsProvider"]["roleAlias"].asStringRequired() ); mAwsCredentialsProvider = std::make_shared<CrtCredentialsProviderAdapter>( crtCredentialsProvider ); } #endif } #ifdef FWE_FEATURE_GREENGRASSV2 else if ( connectionType == "iotGreengrassV2" ) { FWE_LOG_INFO( "ConnectionType is iotGreengrassV2" ); mGreengrassClient = std::make_unique<Aws::Greengrass::GreengrassCoreIpcClient>( *bootstrapPtr ); mGreengrassClientWrapper = std::make_unique<AwsGreengrassCoreIpcClientWrapper>( mGreengrassClient.get() ); mConnectivityModule = std::make_shared<AwsGreengrassV2ConnectivityModule>( *mGreengrassClientWrapper, *mTopicConfig ); #ifdef FWE_FEATURE_S3 mAwsCredentialsProvider = std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); #endif } #endif else if ( ( mConnectivityModuleConfigHook != nullptr ) && mConnectivityModuleConfigHook( config ) ) { // External connectivity module was configured } else { FWE_LOG_ERROR( "Unknown connection type: " + connectionType ); return false; } mReceiverCollectionSchemeList = mConnectivityModule->createReceiver( mTopicConfig->collectionSchemesTopic ); mReceiverDecoderManifest = mConnectivityModule->createReceiver( mTopicConfig->decoderManifestTopic ); #ifdef FWE_FEATURE_STORE_AND_FORWARD if ( mStoreAndForwardEnabled ) { // Receivers to receive Store and Forward Data Upload Requests mReceiverIotJob = mConnectivityModule->createReceiver( mTopicConfig->jobNotificationTopic ); mReceiverJobDocumentAccepted = mConnectivityModule->createReceiver( mTopicConfig->getJobExecutionAcceptedTopic ); mReceiverJobDocumentRejected = mConnectivityModule->createReceiver( mTopicConfig->getJobExecutionRejectedTopic ); mReceiverPendingJobsAccepted = mConnectivityModule->createReceiver( mTopicConfig->getPendingJobExecutionsAcceptedTopic ); mReceiverPendingJobsRejected = mConnectivityModule->createReceiver( mTopicConfig->getPendingJobExecutionsRejectedTopic ); mReceiverUpdateIotJobStatusAccepted = mConnectivityModule->createReceiver( mTopicConfig->updateJobExecutionAcceptedTopic ); mReceiverUpdateIotJobStatusRejected = mConnectivityModule->createReceiver( mTopicConfig->updateJobExecutionRejectedTopic ); mReceiverCanceledIoTJobs = mConnectivityModule->createReceiver( mTopicConfig->jobCancellationInProgressTopic ); } #endif mMqttSender = mConnectivityModule->createSender(); #ifdef FWE_FEATURE_REMOTE_COMMANDS std::shared_ptr<IReceiver> receiverCommandRequest; std::shared_ptr<IReceiver> receiverRejectedCommandResponse; std::shared_ptr<IReceiver> receiverAcceptedCommandResponse; receiverCommandRequest = mConnectivityModule->createReceiver( mTopicConfig->commandRequestTopic ); // The accepted/rejected messages are always sent regardless of whether we are subscribing to the topics or // not. So even if we don't need to receive them, we subscribe to them just to ensure we don't log any // error. receiverAcceptedCommandResponse = mConnectivityModule->createReceiver( mTopicConfig->commandResponseAcceptedTopic ); receiverRejectedCommandResponse = mConnectivityModule->createReceiver( mTopicConfig->commandResponseRejectedTopic ); #endif #ifdef FWE_FEATURE_LAST_KNOWN_STATE std::shared_ptr<IReceiver> receiverLastKnownStateConfig = mConnectivityModule->createReceiver( mTopicConfig->lastKnownStateConfigTopic ); #endif #ifdef FWE_FEATURE_SOMEIP if ( !config["staticConfig"].isMember( "deviceShadowOverSomeip" ) ) { FWE_LOG_TRACE( "DeviceShadowOverSomeip is disabled as no deviceShadowOverSomeip member in staticConfig" ); } else { std::shared_ptr<IReceiver> receiverDeviceShadow = mConnectivityModule->createReceiver( mTopicConfig->deviceShadowPrefix + "#" ); mDeviceShadowOverSomeip = std::make_shared<DeviceShadowOverSomeip>( *mMqttSender ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda receiverDeviceShadow->subscribeToDataReceived( std::bind( &DeviceShadowOverSomeip::onDataReceived, mDeviceShadowOverSomeip.get(), std::placeholders::_1 ) ); mDeviceShadowOverSomeipInstanceName = config["staticConfig"]["deviceShadowOverSomeip"]["someipInstance"].asStringOptional().get_value_or( "commonapi.DeviceShadowOverSomeipInterface" ); if ( !CommonAPI::Runtime::get()->registerService( "local", mDeviceShadowOverSomeipInstanceName, mDeviceShadowOverSomeip, config["staticConfig"]["deviceShadowOverSomeip"]["someipApplicationName"].asStringRequired() ) ) { FWE_LOG_ERROR( "Failed to register DeviceShadowOverSomeip service" ); return false; } } #endif boost::optional<RawData::BufferManagerConfig> rawDataBufferManagerConfig; auto rawDataBufferJsonConfig = config["staticConfig"]["visionSystemDataCollection"]["rawDataBuffer"]; auto rawBufferSize = rawDataBufferJsonConfig["maxSize"].asSizeOptional(); if ( rawBufferSize.get_value_or( SIZE_MAX ) > 0 ) { // Create a Raw Data Buffer Manager std::vector<RawData::SignalBufferOverrides> rawDataBufferOverridesPerSignal; for ( auto i = 0U; i < rawDataBufferJsonConfig["overridesPerSignal"].getArraySizeOptional(); i++ ) { auto signalOverridesJson = rawDataBufferJsonConfig["overridesPerSignal"][i]; RawData::SignalBufferOverrides signalOverrides; signalOverrides.interfaceId = signalOverridesJson["interfaceId"].asStringRequired(); signalOverrides.messageId = signalOverridesJson["messageId"].asStringRequired(); signalOverrides.reservedBytes = signalOverridesJson["reservedSize"].asSizeOptional(); signalOverrides.maxNumOfSamples = signalOverridesJson["maxSamples"].asSizeOptional(); signalOverrides.maxBytesPerSample = signalOverridesJson["maxSizePerSample"].asSizeOptional(); signalOverrides.maxBytes = signalOverridesJson["maxSize"].asSizeOptional(); rawDataBufferOverridesPerSignal.emplace_back( signalOverrides ); } rawDataBufferManagerConfig = RawData::BufferManagerConfig::create( rawBufferSize, rawDataBufferJsonConfig["reservedSizePerSignal"].asSizeOptional(), rawDataBufferJsonConfig["maxSamplesPerSignal"].asSizeOptional(), rawDataBufferJsonConfig["maxSizePerSample"].asSizeOptional(), rawDataBufferJsonConfig["maxSizePerSignal"].asSizeOptional(), rawDataBufferOverridesPerSignal ); if ( !rawDataBufferManagerConfig ) { FWE_LOG_ERROR( "Failed to create raw data buffer manager config" ); return false; } mRawDataBufferManager = std::make_unique<RawData::BufferManager>( rawDataBufferManagerConfig.get() ); } /*************************Connectivity bootstrap end***************************************/ /*************************Remote Profiling bootstrap begin**********************************/ if ( config["staticConfig"].isMember( "remoteProfilerDefaultValues" ) ) { LogLevel logThreshold = LogLevel::Off; /* * logging-upload-level-threshold specifies which log messages normally output to STDOUT are also * uploaded over MQTT. Default is OFF which means no messages are uploaded. If its for example * "Warning" all log messages with this or a higher log level are mirrored over MQTT */ stringToLogLevel( config["staticConfig"]["remoteProfilerDefaultValues"]["loggingUploadLevelThreshold"].asStringRequired(), logThreshold ); /* * metrics-upload-interval-ms defines the interval in which all metrics should be uploaded * 0 means metrics upload is disabled which should be the default. Currently the metrics are * uploaded every given interval independent if the value changed or not * * logging-upload-max-wait-before-upload-ms to avoid to many separate mqtt messages the log messages * are aggregated and sent out delayed. The maximum allowed delay is specified here * * profiler-prefix metrics names uploaded will be prefixed with this string which could * be set different for every vehicle */ // These parameters need to be added to the Config file to enable the feature : // metricsUploadIntervalMs // loggingUploadMaxWaitBeforeUploadMs // profilerPrefix mRemoteProfiler = std::make_unique<RemoteProfiler>( *mMqttSender, config["staticConfig"]["remoteProfilerDefaultValues"]["metricsUploadIntervalMs"].asU32Required(), config["staticConfig"]["remoteProfilerDefaultValues"]["loggingUploadMaxWaitBeforeUploadMs"] .asU32Required(), logThreshold, config["staticConfig"]["remoteProfilerDefaultValues"]["profilerPrefix"].asStringRequired() ); setLogForwarding( mRemoteProfiler.get() ); } /*************************Remote Profiling bootstrap ends**********************************/ /*************************Inspection Engine bootstrap begin*********************************/ auto signalBufferSize = config["staticConfig"]["bufferSizes"]["decodedSignalsBufferSize"].asSizeRequired(); auto signalBuffer = std::make_shared<SignalBuffer>( signalBufferSize, "Signal Buffer", TraceAtomicVariable::QUEUE_CONSUMER_TO_INSPECTION_DATA_FRAMES, // Notify listeners when 10% of the buffer is full so that we don't // let it grow too much. signalBufferSize / 10 ); mSignalBufferDistributor.registerQueue( signalBuffer ); // Create the Data Inspection Queue mCollectedDataReadyToPublish = std::make_shared<DataSenderQueue>( config["staticConfig"]["internalParameters"]["readyToPublishDataBufferSize"].asSizeRequired(), "Collected Data", TraceAtomicVariable::QUEUE_INSPECTION_TO_SENDER ); #ifdef FWE_FEATURE_STORE_AND_FORWARD if ( mStoreAndForwardEnabled ) { const auto persistencyPath = config["staticConfig"]["persistency"]["persistencyPath"].asStringRequired(); mStreamManager = std::make_unique<Aws::IoTFleetWise::Store::StreamManager>( persistencyPath, std::make_unique<DataSenderProtoWriter>( mCANIDTranslator, mRawDataBufferManager.get() ), config["staticConfig"]["publishToCloudParameters"]["maxPublishMessageCount"].asU32Required() ); } #endif auto payloadConfigUncompressed = config["staticConfig"]["payloadAdaption"]["uncompressed"]; PayloadAdaptionConfig payloadAdaptionConfigUncompressed{ payloadConfigUncompressed["transmitThresholdStartPercent"].asU32Optional().get_value_or( 80 ), payloadConfigUncompressed["payloadSizeLimitMinPercent"].asU32Optional().get_value_or( 70 ), payloadConfigUncompressed["payloadSizeLimitMaxPercent"].asU32Optional().get_value_or( 90 ), payloadConfigUncompressed["transmitThresholdAdaptPercent"].asU32Optional().get_value_or( 10 ) }; auto payloadConfigCompressed = config["staticConfig"]["payloadAdaption"]["compressed"]; // Snappy typically compresses to around 30% of original size, so set the starting compressed transmit threshold // to double the maximum payload size: PayloadAdaptionConfig payloadAdaptionConfigCompressed{ payloadConfigCompressed["transmitThresholdStartPercent"].asU32Optional().get_value_or( 200 ), payloadConfigCompressed["payloadSizeLimitMinPercent"].asU32Optional().get_value_or( 70 ), payloadConfigCompressed["payloadSizeLimitMaxPercent"].asU32Optional().get_value_or( 90 ), payloadConfigCompressed["transmitThresholdAdaptPercent"].asU32Optional().get_value_or( 10 ) }; auto telemetryDataSender = std::make_unique<TelemetryDataSender>( *mMqttSender, std::make_unique<DataSenderProtoWriter>( mCANIDTranslator, mRawDataBufferManager.get() ), payloadAdaptionConfigUncompressed, payloadAdaptionConfigCompressed #ifdef FWE_FEATURE_STORE_AND_FORWARD , mStreamManager.get() #endif ); #ifdef FWE_FEATURE_STORE_AND_FORWARD if ( mStoreAndForwardEnabled ) { mRateLimiter = std::make_unique<RateLimiter>( config["staticConfig"]["storeAndForward"]["forwardMaxTokens"].asU32Optional().get_value_or( DEFAULT_MAX_TOKENS ), config["staticConfig"]["storeAndForward"]["forwardTokenRefillsPerSecond"].asU32Optional().get_value_or( DEFAULT_TOKEN_REFILLS_PER_SECOND ) ); mStreamForwarder = std::make_unique<Aws::IoTFleetWise::Store::StreamForwarder>( *mStreamManager, *telemetryDataSender, *mRateLimiter ); // Start the forwarder if ( !mStreamForwarder->start() ) { FWE_LOG_ERROR( "Failed to init and start the Stream Forwarder" ); return false; } } #endif mDataSenders.emplace( SenderDataType::TELEMETRY, std::move( telemetryDataSender ) ); mFetchQueue = std::make_shared<FetchRequestQueue>( config["staticConfig"]["internalParameters"]["maxFetchQueueSize"].asU32Optional().get_value_or( DEFAULT_FETCH_QUEUE_SIZE ), "Data Fetch Queue" ); auto minFetchTriggerIntervalMs = config["staticConfig"]["internalParameters"]["minFetchTriggerIntervalMs"].asU32Optional().get_value_or( MIN_FETCH_TRIGGER_MS ); mCollectionInspectionEngine = std::make_shared<CollectionInspectionEngine>( mRawDataBufferManager.get(), minFetchTriggerIntervalMs, mFetchQueue, true #ifdef FWE_FEATURE_STORE_AND_FORWARD , mStreamForwarder.get() #endif ); mCollectionInspectionWorkerThread = std::make_shared<CollectionInspectionWorkerThread>( *mCollectionInspectionEngine, signalBuffer, mCollectedDataReadyToPublish, config["staticConfig"]["threadIdleTimes"]["inspectionThreadIdleTimeMs"].asU32Required(), mRawDataBufferManager.get() ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda signalBuffer->subscribeToNewDataAvailable( std::bind( &CollectionInspectionWorkerThread::onNewDataAvailable, mCollectionInspectionWorkerThread.get() ) ); /*************************Inspection Engine bootstrap end***********************************/ /*************************Store and Forward IoT Jobs bootstrap begin************************/ #ifdef FWE_FEATURE_STORE_AND_FORWARD if ( mStoreAndForwardEnabled ) { mIoTJobsDataRequestHandler = std::make_unique<IoTJobsDataRequestHandler>( *mMqttSender, *mReceiverIotJob, *mReceiverJobDocumentAccepted, *mReceiverJobDocumentRejected, *mReceiverPendingJobsAccepted, *mReceiverPendingJobsRejected, *mReceiverUpdateIotJobStatusAccepted, *mReceiverUpdateIotJobStatusRejected, *mReceiverCanceledIoTJobs, *mStreamManager, *mStreamForwarder, clientId ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mConnectivityModule->subscribeToConnectionEstablished( std::bind( &IoTJobsDataRequestHandler::onConnectionEstablished, mIoTJobsDataRequestHandler.get() ) ); } #endif /*************************Store and Forward IoT Jobs bootstrap end**************************/ /*************************DataSender bootstrap begin*********************************/ #ifdef FWE_FEATURE_VISION_SYSTEM_DATA DataSenderIonWriter *ionWriter = nullptr; VisionSystemDataSender *visionSystemDataSender = nullptr; if ( ( mAwsCredentialsProvider == nullptr ) || ( !config["staticConfig"].isMember( "s3Upload" ) ) ) { FWE_LOG_INFO( "S3 sender not initialized so no vision-system-data data upload will be supported. Add " "'credentialsProvider' and 's3Upload' section to the config to initialize it." ) } else { auto s3MaxConnections = config["staticConfig"]["s3Upload"]["maxConnections"].asU32Required(); s3MaxConnections = s3MaxConnections > 0U ? s3MaxConnections : 1U; auto createTransferManagerWrapper = [this, s3MaxConnections]( Aws::Client::ClientConfiguration &clientConfiguration, Aws::Transfer::TransferManagerConfiguration &transferManagerConfiguration ) -> std::shared_ptr<TransferManagerWrapper> { clientConfiguration.maxConnections = s3MaxConnections; transferManagerConfiguration.transferExecutor = getTransferManagerExecutor().get(); auto s3Client = std::make_shared<Aws::S3::S3Client>( mAwsCredentialsProvider, Aws::MakeShared<Aws::S3::S3EndpointProvider>( "S3Client" ), clientConfiguration ); transferManagerConfiguration.s3Client = s3Client; return std::make_shared<TransferManagerWrapper>( Aws::Transfer::TransferManager::Create( transferManagerConfiguration ) ); }; mS3Sender = std::make_unique<S3Sender>( createTransferManagerWrapper, config["staticConfig"]["s3Upload"]["multipartSize"].asSizeRequired() ); auto ionWriterPtr = std::make_unique<DataSenderIonWriter>( mRawDataBufferManager.get(), clientId ); ionWriter = ionWriterPtr.get(); auto visionSystemDataSenderPtr = std::make_unique<VisionSystemDataSender>( *mCollectedDataReadyToPublish, *mS3Sender, std::move( ionWriterPtr ), clientId ); visionSystemDataSender = visionSystemDataSenderPtr.get(); mDataSenders.emplace( SenderDataType::VISION_SYSTEM, std::move( visionSystemDataSenderPtr ) ); } #endif #ifdef FWE_FEATURE_REMOTE_COMMANDS mCommandResponses = std::make_shared<DataSenderQueue>( config["staticConfig"]["internalParameters"]["readyToPublishCommandResponsesBufferSize"] .asSizeOptional() .get_value_or( 100 ), "Command Responses", TraceAtomicVariable::QUEUE_PENDING_COMMAND_RESPONSES ); size_t maxConcurrentCommandRequests = config["staticConfig"]["internalParameters"]["maxConcurrentCommandRequests"].asSizeOptional().get_value_or( 100 ); mActuatorCommandManager = std::make_shared<ActuatorCommandManager>( mCommandResponses, maxConcurrentCommandRequests, mRawDataBufferManager.get() ); mDataSenders.emplace( SenderDataType::COMMAND_RESPONSE, std::make_unique<CommandResponseDataSender>( *mMqttSender ) ); #endif #ifdef FWE_FEATURE_LAST_KNOWN_STATE mLastKnownStateDataReadyToPublish = std::make_shared<DataSenderQueue>( config["staticConfig"]["internalParameters"]["readyToPublishDataBufferSize"].asSizeRequired(), "LastKnownState data", TraceAtomicVariable::QUEUE_LAST_KNOWN_STATE_INSPECTION_TO_SENDER ); mDataSenders.emplace( SenderDataType::LAST_KNOWN_STATE, std::make_unique<LastKnownStateDataSender>( *mMqttSender, config["staticConfig"]["publishToCloudParameters"]["maxPublishLastKnownStateMessageCount"] .asU32Optional() .get_value_or( 1000 ) ) ); #endif std::vector<std::shared_ptr<DataSenderQueue>> dataToSendQueues = { #ifdef FWE_FEATURE_REMOTE_COMMANDS mCommandResponses, #endif #ifdef FWE_FEATURE_LAST_KNOWN_STATE mLastKnownStateDataReadyToPublish, #endif mCollectedDataReadyToPublish }; mDataSenderManagerWorkerThread = std::make_shared<DataSenderManagerWorkerThread>( *mConnectivityModule, std::make_unique<DataSenderManager>( mDataSenders, mPayloadManager.get() ), persistencyUploadRetryIntervalMs, dataToSendQueues ); if ( !mDataSenderManagerWorkerThread->start() ) { FWE_LOG_ERROR( "Failed to init and start the Data Sender" ); return false; } // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectedDataReadyToPublish->subscribeToNewDataAvailable( std::bind( &DataSenderManagerWorkerThread::onDataReadyToPublish, mDataSenderManagerWorkerThread.get() ) ); #ifdef FWE_FEATURE_LAST_KNOWN_STATE // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mLastKnownStateDataReadyToPublish->subscribeToNewDataAvailable( std::bind( &DataSenderManagerWorkerThread::onDataReadyToPublish, mDataSenderManagerWorkerThread.get() ) ); #endif /*************************DataSender bootstrap end*********************************/ /*************************CollectionScheme Ingestion bootstrap begin*********************************/ // CollectionScheme Ingestion module executes in the context for the offboardconnectivity thread. Upcoming // messages are expected to come either on the decoder manifest topic or the collectionScheme topic or both // ( eventually ). mSchemaPtr = std::make_shared<Schema>( *mReceiverDecoderManifest, *mReceiverCollectionSchemeList, *mMqttSender ); #ifdef FWE_FEATURE_LAST_KNOWN_STATE if ( receiverLastKnownStateConfig != nullptr ) { mLastKnownStateSchema = std::make_unique<LastKnownStateSchema>( *receiverLastKnownStateConfig ); } #endif /*****************************CollectionScheme Management bootstrap begin*****************************/ // Allow CollectionSchemeManagement to send checkins through the Schema Object Callback mCheckinSender = std::make_shared<CheckinSender>( mSchemaPtr, config["staticConfig"]["publishToCloudParameters"]["collectionSchemeManagementCheckinIntervalMs"] .asU32Required() ); // Create and connect the CollectionScheme Manager mCollectionSchemeManagerPtr = std::make_shared<CollectionSchemeManager>( mCacheAndPersist, mCANIDTranslator, mCheckinSender, mRawDataBufferManager.get() #ifdef FWE_FEATURE_REMOTE_COMMANDS , [this]() -> std::unordered_map<InterfaceID, std::vector<std::string>> { return mActuatorCommandManager->getActuatorNames(); } #endif , config["staticConfig"]["threadIdleTimes"]["collectionSchemeManagerThreadIdleTimeMs"] .asU32Optional() .get_value_or( 0 ) ); // Make sure the CollectionScheme Ingestion can notify the CollectionScheme Manager about the arrival // of new artifacts over the offboardconnectivity receiver. // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mSchemaPtr->subscribeToCollectionSchemeUpdate( std::bind( &CollectionSchemeManager::onCollectionSchemeUpdate, mCollectionSchemeManagerPtr.get(), std::placeholders::_1 ) ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mSchemaPtr->subscribeToDecoderManifestUpdate( std::bind( &CollectionSchemeManager::onDecoderManifestUpdate, mCollectionSchemeManagerPtr.get(), std::placeholders::_1 ) ); #ifdef FWE_FEATURE_LAST_KNOWN_STATE if ( mLastKnownStateSchema != nullptr ) { mLastKnownStateSchema->subscribeToLastKnownStateReceived( // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda std::bind( &CollectionSchemeManager::onStateTemplatesChanged, mCollectionSchemeManagerPtr.get(), std::placeholders::_1 ) ); } #endif // Make sure the CollectionScheme Manager can notify the Inspection Engine about the availability of // a new set of collection CollectionSchemes. // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToInspectionMatrixChange( std::bind( &CollectionInspectionWorkerThread::onChangeInspectionMatrix, mCollectionInspectionWorkerThread.get(), std::placeholders::_1 ) ); #ifdef FWE_FEATURE_VISION_SYSTEM_DATA // Make sure the CollectionScheme Manager can notify the Data Sender about the availability of // a new set of collection CollectionSchemes. if ( visionSystemDataSender != nullptr ) { // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToCollectionSchemeListChange( std::bind( &VisionSystemDataSender::onChangeCollectionSchemeList, visionSystemDataSender, std::placeholders::_1 ) ); } if ( ionWriter != nullptr ) { // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &DataSenderIonWriter::onChangeOfActiveDictionary, ionWriter, std::placeholders::_1, std::placeholders::_2 ) ); } #endif #ifdef FWE_FEATURE_STORE_AND_FORWARD if ( mStoreAndForwardEnabled ) { // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToCollectionSchemeListChange( std::bind( &Aws::IoTFleetWise::Store::StreamManager::onChangeCollectionSchemeList, mStreamManager.get(), std::placeholders::_1 ) ); } #endif /*************************DataFetchManager bootstrap begin*********************************/ mDataFetchManager = std::make_shared<DataFetchManager>( mFetchQueue ); mFetchQueue->subscribeToNewDataAvailable( std::bind( &DataFetchManager::onNewFetchRequestAvailable, mDataFetchManager.get() ) ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToFetchMatrixChange( std::bind( &DataFetchManager::onChangeFetchMatrix, mDataFetchManager.get(), std::placeholders::_1 ) ); /********************************Data source bootstrap start*******************************/ auto obdOverCANModuleInit = false; mCANDataConsumer = std::make_unique<CANDataConsumer>( mSignalBufferDistributor ); for ( unsigned i = 0; i < config["networkInterfaces"].getArraySizeRequired(); i++ ) { const auto networkInterfaceConfig = config["networkInterfaces"][i]; const auto interfaceType = networkInterfaceConfig["type"].asStringRequired(); const auto interfaceId = networkInterfaceConfig["interfaceId"].asStringRequired(); if ( interfaceType == CAN_INTERFACE_TYPE ) { CanTimestampType canTimestampType = CanTimestampType::KERNEL_SOFTWARE_TIMESTAMP; // default auto canConfig = networkInterfaceConfig[CAN_INTERFACE_TYPE]; if ( canConfig.isMember( "timestampType" ) ) { auto timestampTypeInput = canConfig["timestampType"].asStringRequired(); bool success = stringToCanTimestampType( timestampTypeInput, canTimestampType ); if ( !success ) { FWE_LOG_WARN( "Invalid can timestamp type provided: " + timestampTypeInput + " so default to Software" ); } } auto canChannelId = mCANIDTranslator.getChannelNumericID( interfaceId ); auto canSourcePtr = std::make_unique<CANDataSource>( canChannelId, canTimestampType, canConfig["interfaceName"].asStringRequired(), canConfig["protocolName"].asStringRequired() == "CAN-FD", config["staticConfig"]["threadIdleTimes"]["socketCANThreadIdleTimeMs"].asU32Required(), *mCANDataConsumer ); if ( !canSourcePtr->connect() ) { FWE_LOG_ERROR( "Failed to initialize CANDataSource" ); return false; } // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &CANDataSource::onChangeOfActiveDictionary, canSourcePtr.get(), std::placeholders::_1, std::placeholders::_2 ) ); mCANDataSources.push_back( std::move( canSourcePtr ) ); } else if ( interfaceType == OBD_INTERFACE_TYPE ) { if ( !obdOverCANModuleInit ) { auto obdConfig = networkInterfaceConfig[OBD_INTERFACE_TYPE]; mOBDOverCANModule = std::make_shared<OBDOverCANModule>( mSignalBufferDistributor, obdConfig["interfaceName"].asStringRequired(), obdConfig["pidRequestIntervalSeconds"].asU32Required(), obdConfig["dtcRequestIntervalSeconds"].asU32Required(), // Broadcast mode is enabled by default if not defined in config: obdConfig["broadcastRequests"].asBoolOptional().get_value_or( true ) ); obdOverCANModuleInit = true; // Connect the OBD Module if ( !mOBDOverCANModule->connect() ) { FWE_LOG_ERROR( "Failed to connect OBD over CAN module" ); return false; } // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &OBDOverCANModule::onChangeOfActiveDictionary, mOBDOverCANModule.get(), std::placeholders::_1, std::placeholders::_2 ) ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToInspectionMatrixChange( std::bind( &OBDOverCANModule::onChangeInspectionMatrix, mOBDOverCANModule.get(), std::placeholders::_1 ) ); } else { FWE_LOG_ERROR( "obdOverCANModule already initialised" ); } } else if ( interfaceType == EXTERNAL_CAN_INTERFACE_TYPE ) { if ( mExternalCANDataSource != nullptr ) { continue; } mExternalCANDataSource = std::make_unique<ExternalCANDataSource>( mCANIDTranslator, *mCANDataConsumer ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &ExternalCANDataSource::onChangeOfActiveDictionary, mExternalCANDataSource.get(), std::placeholders::_1, std::placeholders::_2 ) ); } else if ( interfaceType == NAMED_SIGNAL_INTERFACE_TYPE ) { if ( mNamedSignalDataSource != nullptr ) { continue; } mNamedSignalDataSource = std::make_shared<NamedSignalDataSource>( interfaceId, mSignalBufferDistributor ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &NamedSignalDataSource::onChangeOfActiveDictionary, mNamedSignalDataSource.get(), std::placeholders::_1, std::placeholders::_2 ) ); } #ifdef FWE_FEATURE_SOMEIP else if ( interfaceType == SOMEIP_COLLECTION_INTERFACE_TYPE ) { if ( mSomeipDataSource != nullptr ) { continue; } // coverity[autosar_cpp14_a20_8_4_violation] Shared pointer interface required for unit testing auto namedSignalDataSource = std::make_shared<NamedSignalDataSource>( interfaceId, mSignalBufferDistributor ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &NamedSignalDataSource::onChangeOfActiveDictionary, namedSignalDataSource.get(), std::placeholders::_1, std::placeholders::_2 ) ); auto someipCollectionInterfaceConfig = networkInterfaceConfig[SOMEIP_COLLECTION_INTERFACE_TYPE]; mSomeipDataSource = std::make_unique<SomeipDataSource>( createExampleSomeipInterfaceWrapper( someipCollectionInterfaceConfig["someipApplicationName"].asStringRequired(), someipCollectionInterfaceConfig["someipInstance"].asStringOptional().get_value_or( "commonapi.ExampleSomeipInterface" ), mRawDataBufferManager.get(), false ), std::move( namedSignalDataSource ), mRawDataBufferManager.get(), someipCollectionInterfaceConfig["cyclicUpdatePeriodMs"].asU32Required() ); if ( !mSomeipDataSource->connect() ) { FWE_LOG_ERROR( "Failed to initialize SOME/IP data source" ); return false; } } #endif #ifdef FWE_FEATURE_UDS_DTC_EXAMPLE else if ( interfaceType == UDS_DTC_INTERFACE ) { FWE_LOG_INFO( "UDS Template DTC Interface Type received" ); mDiagnosticNamedSignalDataSource = std::make_shared<NamedSignalDataSource>( interfaceId, mSignalBufferDistributor ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &NamedSignalDataSource::onChangeOfActiveDictionary, mDiagnosticNamedSignalDataSource.get(), std::placeholders::_1, std::placeholders::_2 ) ); std::vector<EcuConfig> remoteDiagnosticInterfaceConfig; auto ecuConfiguration = networkInterfaceConfig[UDS_DTC_INTERFACE]; for ( unsigned j = 0; j < networkInterfaceConfig[UDS_DTC_INTERFACE]["configs"].getArraySizeRequired(); j++ ) { auto ecu = ecuConfiguration["configs"][j]; auto canInterface = ecu["can"]; EcuConfig ecuConfig; ecuConfig.ecuName = static_cast<std::string>( ecu["name"].asStringRequired() ); ecuConfig.canBus = static_cast<std::string>( canInterface["interfaceName"].asStringRequired() ); try { ecuConfig.targetAddress = static_cast<int>( std::stoi( ecu["targetAddress"].asStringRequired(), nullptr, 0 ) ); ecuConfig.physicalRequestID = static_cast<uint32_t>( std::stoi( canInterface["physicalRequestID"].asStringRequired(), nullptr, 0 ) ); ecuConfig.physicalResponseID = static_cast<uint32_t>( std::stoi( canInterface["physicalResponseID"].asStringRequired(), nullptr, 0 ) ); ecuConfig.functionalAddress = static_cast<uint32_t>( std::stoi( canInterface["functionalAddress"].asStringRequired(), nullptr, 0 ) ); } catch ( const std::invalid_argument &err ) { FWE_LOG_ERROR( "Could not parse received remote diagnostics interface configuration: " + std::string( err.what() ) ); return false; } remoteDiagnosticInterfaceConfig.emplace_back( ecuConfig ); } mExampleDiagnosticInterface = std::make_shared<ExampleUDSInterface>( remoteDiagnosticInterfaceConfig ); if ( !mExampleDiagnosticInterface->start() ) { FWE_LOG_ERROR( "Failed to initialize the Template Interface" ); return false; } } #endif #ifdef FWE_FEATURE_ROS2 else if ( interfaceType == ROS2_INTERFACE_TYPE ) { ROS2DataSourceConfig ros2Config; if ( !ROS2DataSourceConfig::parseFromJson( networkInterfaceConfig, ros2Config ) ) { return false; } mROS2DataSource = std::make_shared<ROS2DataSource>( ros2Config, mSignalBufferDistributor, mRawDataBufferManager.get() ); mROS2DataSource->connect(); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &ROS2DataSource::onChangeOfActiveDictionary, mROS2DataSource.get(), std::placeholders::_1, std::placeholders::_2 ) ); } #endif #ifdef FWE_FEATURE_SOMEIP else if ( interfaceType == SOMEIP_TO_CAN_BRIDGE_INTERFACE_TYPE ) { auto canChannelId = mCANIDTranslator.getChannelNumericID( interfaceId ); auto someipToCanBridgeConfig = networkInterfaceConfig[SOMEIP_TO_CAN_BRIDGE_INTERFACE_TYPE]; auto bridge = std::make_unique<SomeipToCanBridge>( static_cast<uint16_t>( someipToCanBridgeConfig["someipServiceId"].asU32FromStringRequired() ), static_cast<uint16_t>( someipToCanBridgeConfig["someipInstanceId"].asU32FromStringRequired() ), static_cast<uint16_t>( someipToCanBridgeConfig["someipEventId"].asU32FromStringRequired() ), static_cast<uint16_t>( someipToCanBridgeConfig["someipEventGroupId"].asU32FromStringRequired() ), someipToCanBridgeConfig["someipApplicationName"].asStringRequired(), canChannelId, *mCANDataConsumer, []( std::string name ) -> std::shared_ptr<vsomeip::application> { return vsomeip::runtime::get()->create_application( name ); }, []( std::string name ) { vsomeip::runtime::get()->remove_application( name ); } ); if ( !bridge->connect() ) { FWE_LOG_ERROR( "Failed to initialize SomeipToCanBridge" ); return false; } // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &SomeipToCanBridge::onChangeOfActiveDictionary, bridge.get(), std::placeholders::_1, std::placeholders::_2 ) ); mSomeipToCanBridges.push_back( std::move( bridge ) ); } #endif #ifdef FWE_FEATURE_REMOTE_COMMANDS #ifdef FWE_FEATURE_SOMEIP else if ( interfaceType == SOMEIP_COMMAND_INTERFACE_TYPE ) { if ( mExampleSomeipCommandDispatcher != nullptr ) { continue; } auto exampleSomeipInterfaceWrapper = createExampleSomeipInterfaceWrapper( networkInterfaceConfig[SOMEIP_COMMAND_INTERFACE_TYPE]["someipApplicationName"].asStringRequired(), networkInterfaceConfig[SOMEIP_COMMAND_INTERFACE_TYPE]["someipInstance"] .asStringOptional() .get_value_or( "commonapi.ExampleSomeipInterface" ), mRawDataBufferManager.get(), true ); mExampleSomeipCommandDispatcher = std::make_shared<SomeipCommandDispatcher>( exampleSomeipInterfaceWrapper ); if ( !mActuatorCommandManager->registerDispatcher( interfaceId, mExampleSomeipCommandDispatcher ) ) { return false; } } #endif else if ( interfaceType == CAN_COMMAND_INTERFACE_TYPE ) { if ( mCanCommandDispatcher != nullptr ) { continue; } mCanCommandDispatcher = std::make_shared<CanCommandDispatcher>( EXAMPLE_CAN_INTERFACE_SUPPORTED_ACTUATOR_MAP, networkInterfaceConfig[CAN_COMMAND_INTERFACE_TYPE]["interfaceName"].asStringRequired(), mRawDataBufferManager.get() ); if ( !mActuatorCommandManager->registerDispatcher( interfaceId, mCanCommandDispatcher ) ) { return false; } } #endif #ifdef FWE_FEATURE_AAOS_VHAL else if ( interfaceType == AAOS_VHAL_INTERFACE_TYPE ) { if ( mAaosVhalSource != nullptr ) { continue; } mAaosVhalSource = std::make_shared<AaosVhalSource>( interfaceId, mSignalBufferDistributor ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &AaosVhalSource::onChangeOfActiveDictionary, mAaosVhalSource.get(), std::placeholders::_1, std::placeholders::_2 ) ); } #endif #ifdef FWE_FEATURE_IWAVE_GPS else if ( interfaceType == IWAVE_GPS_INTERFACE_TYPE ) { if ( mIWaveGpsSource != nullptr ) { continue; } // coverity[autosar_cpp14_a20_8_4_violation] Shared pointer interface required for unit testing auto namedSignalDataSource = std::make_shared<NamedSignalDataSource>( interfaceId, mSignalBufferDistributor ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &NamedSignalDataSource::onChangeOfActiveDictionary, namedSignalDataSource.get(), std::placeholders::_1, std::placeholders::_2 ) ); auto iwaveGpsConfig = networkInterfaceConfig[IWAVE_GPS_INTERFACE_TYPE]; mIWaveGpsSource = std::make_shared<IWaveGpsSource>( namedSignalDataSource, iwaveGpsConfig[IWaveGpsSource::PATH_TO_NMEA].asStringRequired(), iwaveGpsConfig[IWaveGpsSource::LATITUDE_SIGNAL_NAME].asStringRequired(), iwaveGpsConfig[IWaveGpsSource::LONGITUDE_SIGNAL_NAME].asStringRequired(), iwaveGpsConfig[IWaveGpsSource::POLL_INTERVAL_MS].asU32Required() ); if ( !mIWaveGpsSource->connect() ) { FWE_LOG_ERROR( "IWaveGps initialization failed" ); return false; } } #endif #ifdef FWE_FEATURE_EXTERNAL_GPS else if ( interfaceType == EXTERNAL_GPS_INTERFACE_TYPE ) { if ( mExternalGpsSource != nullptr ) { continue; } // coverity[autosar_cpp14_a20_8_4_violation] Shared pointer interface required for unit testing auto namedSignalDataSource = std::make_shared<NamedSignalDataSource>( interfaceId, mSignalBufferDistributor ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange( std::bind( &NamedSignalDataSource::onChangeOfActiveDictionary, namedSignalDataSource.get(), std::placeholders::_1, std::placeholders::_2 ) ); auto externalGpsConfig = networkInterfaceConfig[EXTERNAL_GPS_INTERFACE_TYPE]; mExternalGpsSource = std::make_shared<ExternalGpsSource>( namedSignalDataSource, externalGpsConfig[ExternalGpsSource::LATITUDE_SIGNAL_NAME].asStringRequired(), externalGpsConfig[ExternalGpsSource::LONGITUDE_SIGNAL_NAME].asStringRequired() ); } #endif else if ( ( mNetworkInterfaceConfigHook != nullptr ) && mNetworkInterfaceConfigHook( networkInterfaceConfig ) ) { // External interface was configured } else { FWE_LOG_ERROR( interfaceType + " is not supported" ); } } #ifdef FWE_FEATURE_UDS_DTC mDiagnosticDataSource = std::make_shared<RemoteDiagnosticDataSource>( mDiagnosticNamedSignalDataSource, mRawDataBufferManager.get() #ifdef FWE_FEATURE_UDS_DTC_EXAMPLE , mExampleDiagnosticInterface #endif ); if ( !mDiagnosticDataSource->start() ) { FWE_LOG_ERROR( "Failed to start the Remote Diagnostics Data Source" ); return false; } mDataFetchManager->registerCustomFetchFunction( "DTC_QUERY", std::bind( &RemoteDiagnosticDataSource::DTC_QUERY, mDiagnosticDataSource.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3 ) ); #endif /********************************Data source bootstrap end*******************************/ /*******************************Custom function setup begin******************************/ #ifdef FWE_FEATURE_CUSTOM_FUNCTION_EXAMPLES mCollectionInspectionEngine->registerCustomFunction( "abs", { CustomFunctionMath::absFunc, nullptr, nullptr } ); mCollectionInspectionEngine->registerCustomFunction( "min", { CustomFunctionMath::minFunc, nullptr, nullptr } ); mCollectionInspectionEngine->registerCustomFunction( "max", { CustomFunctionMath::maxFunc, nullptr, nullptr } ); mCollectionInspectionEngine->registerCustomFunction( "pow", { CustomFunctionMath::powFunc, nullptr, nullptr } ); mCollectionInspectionEngine->registerCustomFunction( "log", { CustomFunctionMath::logFunc, nullptr, nullptr } ); mCollectionInspectionEngine->registerCustomFunction( "ceil", { CustomFunctionMath::ceilFunc, nullptr, nullptr } ); mCollectionInspectionEngine->registerCustomFunction( "floor", { CustomFunctionMath::floorFunc, nullptr, nullptr } ); mCustomFunctionMultiRisingEdgeTrigger = std::make_unique<CustomFunctionMultiRisingEdgeTrigger>( mNamedSignalDataSource, mRawDataBufferManager.get() ); mCollectionInspectionEngine->registerCustomFunction( "MULTI_RISING_EDGE_TRIGGER", { [this]( auto invocationId, const auto &args ) -> CustomFunctionInvokeResult { return mCustomFunctionMultiRisingEdgeTrigger->invoke( invocationId, args ); }, [this]( const auto &collectedSignalIds, auto timestamp, auto &collectedData ) { mCustomFunctionMultiRisingEdgeTrigger->conditionEnd( collectedSignalIds, timestamp, collectedData ); }, [this]( auto invocationId ) { mCustomFunctionMultiRisingEdgeTrigger->cleanup( invocationId ); } } ); #endif #ifdef FWE_FEATURE_SCRIPT_ENGINE if ( ( mAwsCredentialsProvider == nullptr ) || ( !config["staticConfig"].isMember( "scriptEngine" ) ) ) { FWE_LOG_TRACE( "Script engine support is disabled. Add 'credentialsProvider' and 'scriptEngine' section to " "the config to initialize it." ); } else { auto bucketRegion = config["staticConfig"]["scriptEngine"]["bucketRegion"].asStringRequired(); auto s3MaxConnections = config["staticConfig"]["scriptEngine"]["maxConnections"].asU32Required(); s3MaxConnections = s3MaxConnections > 0U ? s3MaxConnections : 1U; auto transferManagerConfiguration = std::make_shared<Aws::Transfer::TransferManagerConfiguration>( nullptr ); Aws::S3::Model::GetObjectRequest getObjectTemplate; getObjectTemplate.WithExpectedBucketOwner( config["staticConfig"]["scriptEngine"]["bucketOwner"].asStringRequired() ); transferManagerConfiguration->getObjectTemplate = getObjectTemplate; transferManagerConfiguration->transferStatusUpdatedCallback = // coverity[autosar_cpp14_a8_4_11_violation] smart pointer needed to match the expected signature [this]( const Aws::Transfer::TransferManager *transferManager, const std::shared_ptr<const Aws::Transfer::TransferHandle> &transferHandle ) { static_cast<void>( transferManager ); mCustomFunctionScriptEngine->transferStatusUpdatedCallback( transferHandle ); }; transferManagerConfiguration->errorCallback = // coverity[autosar_cpp14_a8_4_11_violation] smart pointer needed to match the expected signature [this]( const Aws::Transfer::TransferManager *transferManager, const std::shared_ptr<const Aws::Transfer::TransferHandle> &transferHandle, const Aws::Client::AWSError<Aws::S3::S3Errors> &error ) { static_cast<void>( transferManager ); mCustomFunctionScriptEngine->transferErrorCallback( transferHandle, error ); }; transferManagerConfiguration->transferInitiatedCallback = // coverity[autosar_cpp14_a8_4_11_violation] smart pointer needed to match the expected signature [this]( const Aws::Transfer::TransferManager *transferManager, const std::shared_ptr<const Aws::Transfer::TransferHandle> &transferHandle ) { static_cast<void>( transferManager ); mCustomFunctionScriptEngine->transferInitiatedCallback( transferHandle ); }; transferManagerConfiguration->transferExecutor = getTransferManagerExecutor().get(); auto createTransferManagerWrapper = [this, transferManagerConfiguration, bucketRegion, s3MaxConnections]() -> std::shared_ptr<TransferManagerWrapper> { Aws::Client::ClientConfigurationInitValues initValues; // The SDK can use IMDS to determine the region, but since we will pass the region we don't // want the SDK to use it, specially because in non-EC2 environments without any AWS SDK // config at all, this can cause delays when setting up the client: // https://github.com/aws/aws-sdk-cpp/issues/1511 initValues.shouldDisableIMDS = true; Aws::Client::ClientConfiguration clientConfiguration( initValues ); clientConfiguration.region = bucketRegion; clientConfiguration.maxConnections = s3MaxConnections; auto s3Client = std::make_shared<Aws::S3::S3Client>( mAwsCredentialsProvider, Aws::MakeShared<Aws::S3::S3EndpointProvider>( "S3Client" ), clientConfiguration ); transferManagerConfiguration->s3Client = s3Client; return std::make_shared<TransferManagerWrapper>( Aws::Transfer::TransferManager::Create( *transferManagerConfiguration ) ); }; mCustomFunctionScriptEngine = std::make_shared<CustomFunctionScriptEngine>( mNamedSignalDataSource, mRawDataBufferManager.get(), createTransferManagerWrapper, getAbsolutePath( config["staticConfig"]["persistency"]["persistencyPath"].asStringRequired() + "/scripts", configFileDirectoryPath ) .string(), config["staticConfig"]["scriptEngine"]["bucketName"].asStringRequired() ); } #endif #ifdef FWE_FEATURE_MICROPYTHON if ( ( mCustomFunctionScriptEngine == nullptr ) || ( !config["staticConfig"].isMember( "micropython" ) ) ) { FWE_LOG_TRACE( "MicroPython support is disabled. Add 'scriptEngine' and 'micropython' section to " "the config to initialize it." ); } else { mCustomFunctionMicroPython = std::make_unique<CustomFunctionMicroPython>( mCustomFunctionScriptEngine ); mCollectionInspectionEngine->registerCustomFunction( "python", CustomFunctionCallbacks{ [this]( auto invocationID, const auto &args ) -> CustomFunctionInvokeResult { return mCustomFunctionMicroPython->invoke( invocationID, args ); }, // coverity[autosar_cpp14_a5_1_9_violation] Duplicate lambda for ease of maintenance [this]( const auto &collectedSignalIds, auto timestamp, auto &collectedData ) { mCustomFunctionScriptEngine->conditionEnd( collectedSignalIds, timestamp, collectedData ); }, [this]( auto invocationID ) { mCustomFunctionMicroPython->cleanup( invocationID ); } } ); } #endif #ifdef FWE_FEATURE_CPYTHON if ( ( mCustomFunctionScriptEngine == nullptr ) || ( !config["staticConfig"].isMember( "cpython" ) ) ) { FWE_LOG_TRACE( "CPython support is disabled. Add 'scriptEngine' and 'cpython' section to " "the config to initialize it." ); } else { mCustomFunctionCPython = std::make_unique<CustomFunctionCPython>( mCustomFunctionScriptEngine ); mCollectionInspectionEngine->registerCustomFunction( "python", CustomFunctionCallbacks{ [this]( auto invocationID, const auto &args ) -> CustomFunctionInvokeResult { return mCustomFunctionCPython->invoke( invocationID, args ); }, // coverity[autosar_cpp14_a5_1_9_violation] Duplicate lambda for ease of maintenance [this]( const auto &collectedSignalIds, auto timestamp, auto &collectedData ) { mCustomFunctionScriptEngine->conditionEnd( collectedSignalIds, timestamp, collectedData ); }, [this]( auto invocationID ) { mCustomFunctionCPython->cleanup( invocationID ); } } ); } #endif /********************************Custom function setup end*******************************/ #ifdef FWE_FEATURE_REMOTE_COMMANDS /********************************Remote commands bootstrap begin***************************/ if ( receiverCommandRequest ) { mCommandSchema = std::make_unique<CommandSchema>( *receiverCommandRequest, mCommandResponses, mRawDataBufferManager.get() ); if ( receiverRejectedCommandResponse != nullptr ) { // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda receiverRejectedCommandResponse->subscribeToDataReceived( std::bind( &CommandSchema::onRejectedCommandResponseReceived, std::placeholders::_1 ) ); } // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCommandResponses->subscribeToNewDataAvailable( std::bind( &DataSenderManagerWorkerThread::onDataReadyToPublish, mDataSenderManagerWorkerThread.get() ) ); if ( !mActuatorCommandManager->start() ) { FWE_LOG_ERROR( "Failed to init and start the Command Manager" ); return false; } // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCollectionSchemeManagerPtr->subscribeToCustomSignalDecoderFormatMapChange( std::bind( &ActuatorCommandManager::onChangeOfCustomSignalDecoderFormatMap, mActuatorCommandManager.get(), std::placeholders::_1, std::placeholders::_2 ) ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda mCommandSchema->subscribeToActuatorCommandRequestReceived( std::bind( &ActuatorCommandManager::onReceivingCommandRequest, mActuatorCommandManager.get(), std::placeholders::_1 ) ); } /********************************Remote commands bootstrap end*****************************/ #endif #ifdef FWE_FEATURE_LAST_KNOWN_STATE /********************************Last known state bootstrap begin**************************/ if ( receiverCommandRequest && receiverLastKnownStateConfig ) { auto lastKnownStateInspector = std::make_unique<LastKnownStateInspector>( mCommandResponses, mCacheAndPersist ); auto lastKnownStateSignalBuffer = std::make_shared<SignalBuffer>( signalBufferSize, "LKS Signal Buffer", TraceAtomicVariable::QUEUE_CONSUMER_TO_LAST_KNOWN_STATE_INSPECTION, // Notify listeners when 10% of the buffer is full so that we don't // let it grow too much. signalBufferSize / 10 ); mSignalBufferDistributor.registerQueue( lastKnownStateSignalBuffer ); mLastKnownStateWorkerThread = std::make_shared<LastKnownStateWorkerThread>( lastKnownStateSignalBuffer, mLastKnownStateDataReadyToPublish, std::move( lastKnownStateInspector ), config["staticConfig"]["threadIdleTimes"]["lastKnownStateThreadIdleTimeMs"] .asU32Optional() .get_value_or( 0 ) ); mCollectionSchemeManagerPtr->subscribeToStateTemplatesChange( // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda std::bind( &LastKnownStateWorkerThread::onStateTemplatesChanged, mLastKnownStateWorkerThread.get(), std::placeholders::_1 ) ); mCommandSchema->subscribeToLastKnownStateCommandRequestReceived( // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda std::bind( &LastKnownStateWorkerThread::onNewCommandReceived, mLastKnownStateWorkerThread.get(), std::placeholders::_1 ) ); // coverity[autosar_cpp14_a18_9_1_violation] std::bind is easier to maintain than extra lambda lastKnownStateSignalBuffer->subscribeToNewDataAvailable( std::bind( &LastKnownStateWorkerThread::onNewDataAvailable, mLastKnownStateWorkerThread.get() ) ); if ( !mLastKnownStateWorkerThread->start() ) { FWE_LOG_ERROR( "Failed to init and start the Last Known State Inspection Engine" ); return false; } } else if ( receiverLastKnownStateConfig == nullptr ) { FWE_LOG_INFO( "Disabling LastKnownState because LastKnownState topics are not configured" ); } else if ( receiverCommandRequest == nullptr ) { FWE_LOG_WARN( "Disabling LastKnownState because command topics are not configured" ); } /********************************Last known state bootstrap end****************************/ #endif if ( mStartupConfigHook != nullptr ) { mStartupConfigHook( config ); } if ( !mCollectionInspectionWorkerThread->start() ) { FWE_LOG_ERROR( "Failed to start the Inspection Engine" ); return false; } // For asynchronous connect the call needs to be done after all senders and receivers are // created and all receiver listeners are subscribed. if ( !mConnectivityModule->connect() ) { return false; } if ( ( mRemoteProfiler != nullptr ) && ( !mRemoteProfiler->start() ) ) { FWE_LOG_WARN( "Failed to start the Remote Profiler - No remote profiling available until FWE restart" ); } if ( !mCheckinSender->start() ) { FWE_LOG_ERROR( "Failed to start the Checkin thread" ); return false; } // Only start the CollectionSchemeManager after all listeners have subscribed, otherwise // they will not be notified of the initial decoder manifest and collection schemes that are // read from persistent memory: if ( !mCollectionSchemeManagerPtr->connect() ) { FWE_LOG_ERROR( "Failed to start the CollectionScheme Manager" ); return false; } /****************************CollectionScheme Manager bootstrap end*************************/ if ( !mDataFetchManager->start() ) { FWE_LOG_ERROR( "Failed to start the DataFetchManager" ); return false; } mPrintMetricsCyclicPeriodMs = config["staticConfig"]["internalParameters"]["metricsCyclicPrintIntervalMs"].asU32Optional().get_value_or( 0 ); } catch ( const std::exception &e ) { FWE_LOG_ERROR( "Fatal Error during AWS IoT FleetWise Bootstrap: " + std::string( e.what() ) ); return false; } FWE_LOG_INFO( "Engine Connected" ); return true; } bool IoTFleetWiseEngine::disconnect() { if ( mShutdownConfigHook != nullptr ) { if ( !mShutdownConfigHook() ) { return false; } } #ifdef FWE_FEATURE_AAOS_VHAL mAaosVhalSource.reset(); #endif #ifdef FWE_FEATURE_EXTERNAL_GPS mExternalGpsSource.reset(); #endif #ifdef FWE_FEATURE_IWAVE_GPS mIWaveGpsSource.reset(); #endif #ifdef FWE_FEATURE_ROS2 if ( mROS2DataSource ) { mROS2DataSource->disconnect(); } #endif #ifdef FWE_FEATURE_SOMEIP for ( auto &bridge : mSomeipToCanBridges ) { bridge->disconnect(); } mSomeipDataSource.reset(); mExampleSomeipCommandDispatcher.reset(); if ( mDeviceShadowOverSomeip ) { if ( !CommonAPI::Runtime::get()->unregisterService( "local", v1::commonapi::DeviceShadowOverSomeipInterface::getInterface(), mDeviceShadowOverSomeipInstanceName ) ) { FWE_LOG_ERROR( "Failed to unregister DeviceShadowOverSomeip service" ); return false; } } mDeviceShadowOverSomeip.reset(); #endif if ( mOBDOverCANModule ) { if ( !mOBDOverCANModule->disconnect() ) { FWE_LOG_ERROR( "Could not disconnect OBD over CAN module" ); return false; } } #ifdef FWE_FEATURE_UDS_DTC_EXAMPLE if ( mExampleDiagnosticInterface != nullptr ) { if ( !mExampleDiagnosticInterface->stop() ) { FWE_LOG_ERROR( "Could not stop DiagnosticInterface" ); return false; } } #endif #ifdef FWE_FEATURE_UDS_DTC if ( mDiagnosticDataSource != nullptr ) { if ( !mDiagnosticDataSource->stop() ) { FWE_LOG_ERROR( "Could not stop DiagnosticDataSource" ); return false; } } #endif if ( !mDataFetchManager->stop() ) { FWE_LOG_ERROR( "Could not stop the DataFetchManager" ); return false; } if ( !mCollectionInspectionWorkerThread->stop() ) { FWE_LOG_ERROR( "Could not stop the Inspection Engine" ); return false; } setLogForwarding( nullptr ); if ( ( mRemoteProfiler != nullptr ) && ( !mRemoteProfiler->stop() ) ) { FWE_LOG_ERROR( "Could not stop the Remote Profiler" ); return false; } if ( !mCollectionSchemeManagerPtr->disconnect() ) { FWE_LOG_ERROR( "Could not stop the CollectionScheme Manager" ); return false; } if ( !mCheckinSender->stop() ) { FWE_LOG_ERROR( "Failed to stop the Checkin thread" ); return false; } for ( auto &source : mCANDataSources ) { if ( !source->disconnect() ) { FWE_LOG_ERROR( "Could not disconnect CAN data source" ); return false; } } if ( !mConnectivityModule->disconnect() ) { FWE_LOG_ERROR( "Could not disconnect the offboard connectivity" ); return false; } #ifdef FWE_FEATURE_REMOTE_COMMANDS if ( mActuatorCommandManager != nullptr ) { if ( !mActuatorCommandManager->stop() ) { FWE_LOG_ERROR( "Could not stop the ActuatorCommandManager" ); return false; } } #endif #ifdef FWE_FEATURE_STORE_AND_FORWARD // iot jobs depends on stream forwarder, // so only stop forwarder after connectivity module is disconnected if ( mStreamForwarder ) { if ( !mStreamForwarder->stop() ) { FWE_LOG_ERROR( "Could not stop the SteamForwarder" ); return false; } } #endif #ifdef FWE_FEATURE_LAST_KNOWN_STATE if ( ( mLastKnownStateWorkerThread != nullptr ) && ( !mLastKnownStateWorkerThread->stop() ) ) { FWE_LOG_ERROR( "Could not stop the Last Known State Inspection Thread" ); return false; } #endif if ( !mDataSenderManagerWorkerThread->stop() ) { FWE_LOG_ERROR( "Could not stop the DataSenderManager" ); return false; } #ifdef FWE_FEATURE_VISION_SYSTEM_DATA if ( mS3Sender != nullptr ) { if ( !mS3Sender->disconnect() ) { FWE_LOG_ERROR( "Could not disconnect the S3Sender" ); return false; } } #endif #ifdef FWE_FEATURE_SCRIPT_ENGINE if ( mCustomFunctionScriptEngine ) { mCustomFunctionScriptEngine->shutdown(); } #endif #ifdef FWE_FEATURE_MICROPYTHON mCustomFunctionMicroPython.reset(); #endif #ifdef FWE_FEATURE_CPYTHON mCustomFunctionCPython.reset(); #endif #ifdef FWE_FEATURE_SCRIPT_ENGINE mCustomFunctionScriptEngine.reset(); #endif FWE_LOG_INFO( "Engine Disconnected" ); TraceModule::get().sectionEnd( TraceSection::FWE_SHUTDOWN ); TraceModule::get().print(); return true; } bool IoTFleetWiseEngine::start() { // Prevent concurrent stop/init std::lock_guard<std::mutex> lock( mThreadMutex ); // On multi core systems the shared variable mShouldStop must be updated for // all cores before starting the thread otherwise thread will directly end mShouldStop.store( false ); if ( !mThread.create( [this]() { this->doWork(); } ) ) { FWE_LOG_TRACE( "Engine Thread failed to start" ); } else { FWE_LOG_TRACE( "Engine Thread started" ); mThread.setThreadName( "fwEMEngine" ); } return mThread.isActive() && mThread.isValid(); } bool IoTFleetWiseEngine::stop() { TraceModule::get().sectionBegin( TraceSection::FWE_SHUTDOWN ); std::lock_guard<std::mutex> lock( mThreadMutex ); mShouldStop.store( true, std::memory_order_relaxed ); mWait.notify(); mThread.release(); mShouldStop.store( false, std::memory_order_relaxed ); return !mThread.isActive(); } bool IoTFleetWiseEngine::shouldStop() const { return mShouldStop.load( std::memory_order_relaxed ); } bool IoTFleetWiseEngine::isAlive() { return mThread.isValid() && mThread.isActive(); } void IoTFleetWiseEngine::doWork() { TraceModule::get().sectionEnd( TraceSection::FWE_STARTUP ); while ( !shouldStop() ) { mTimer.reset(); uint64_t minTimeToWaitMs = UINT64_MAX; if ( mPrintMetricsCyclicPeriodMs != 0 ) { uint64_t timeToWaitMs = mPrintMetricsCyclicPeriodMs - std::min( static_cast<uint64_t>( mPrintMetricsCyclicTimer.getElapsedMs().count() ), mPrintMetricsCyclicPeriodMs ); minTimeToWaitMs = std::min( minTimeToWaitMs, timeToWaitMs ); } if ( minTimeToWaitMs < UINT64_MAX ) { FWE_LOG_TRACE( "Waiting for: " + std::to_string( minTimeToWaitMs ) + " ms. Cyclic metrics print:" + std::to_string( mPrintMetricsCyclicPeriodMs ) + " configured, " + std::to_string( mPrintMetricsCyclicTimer.getElapsedMs().count() ) + " timer." ); mWait.wait( static_cast<uint32_t>( minTimeToWaitMs ) ); } else { mWait.wait( Signal::WaitWithPredicate ); auto elapsedTimeMs = mTimer.getElapsedMs().count(); FWE_LOG_TRACE( "Event arrived. Time elapsed waiting for the event: " + std::to_string( elapsedTimeMs ) + " ms" ); } if ( ( mPrintMetricsCyclicPeriodMs > 0 ) && ( static_cast<uint64_t>( mPrintMetricsCyclicTimer.getElapsedMs().count() ) >= mPrintMetricsCyclicPeriodMs ) ) { mPrintMetricsCyclicTimer.reset(); TraceModule::get().print(); TraceModule::get().startNewObservationWindow( static_cast<uint32_t>( mPrintMetricsCyclicPeriodMs ) ); } } } std::string IoTFleetWiseEngine::getStatusSummary() { if ( mConnectivityModule == nullptr || mCollectionSchemeManagerPtr == nullptr || mMqttSender == nullptr || mOBDOverCANModule == nullptr ) { return ""; } std::string status; status += std::string( "MQTT connection: " ) + ( mConnectivityModule->isAlive() ? "CONNECTED" : "NOT CONNECTED" ) + "\n\n"; status += "Campaign ARNs:\n"; auto collectionSchemeArns = mCollectionSchemeManagerPtr->getCollectionSchemeArns(); if ( collectionSchemeArns.empty() ) { status += "NONE\n"; } else { for ( auto &collectionSchemeArn : collectionSchemeArns ) { status += collectionSchemeArn + "\n"; } } status += "\n"; status += "Payloads sent: " + std::to_string( mMqttSender->getPayloadCountSent() ) + "\n\n"; return status; } } // namespace IoTFleetWise } // namespace Aws