src/SomeipDataSource.cpp (215 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 #include "aws/iotfleetwise/SomeipDataSource.h" #include "aws/iotfleetwise/LoggingModule.h" #include "aws/iotfleetwise/SignalTypes.h" #include "aws/iotfleetwise/Thread.h" #include <chrono> #include <cstdint> #include <string> #include <utility> #include <vector> namespace Aws { namespace IoTFleetWise { SomeipDataSource::SomeipDataSource( std::shared_ptr<ExampleSomeipInterfaceWrapper> exampleSomeipInterfaceWrapper, std::shared_ptr<NamedSignalDataSource> namedSignalDataSource, RawData::BufferManager *rawDataBufferManager, uint32_t cyclicUpdatePeriodMs ) : mExampleSomeipInterfaceWrapper( std::move( exampleSomeipInterfaceWrapper ) ) , mNamedSignalDataSource( std::move( namedSignalDataSource ) ) , mRawDataBufferManager( rawDataBufferManager ) , mCyclicUpdatePeriodMs( cyclicUpdatePeriodMs ) { } SomeipDataSource::~SomeipDataSource() { if ( mThread.joinable() ) { mShouldStop = true; mThread.join(); } if ( mProxy ) { if ( mXSubscription != 0 ) { mProxy->getXAttribute().getChangedEvent().unsubscribe( mXSubscription ); } if ( mTemperatureSubscription != 0 ) { mProxy->getTemperatureAttribute().getChangedEvent().unsubscribe( mTemperatureSubscription ); } if ( mA1Subscription != 0 ) { mProxy->getA1Attribute().getChangedEvent().unsubscribe( mA1Subscription ); } } // coverity[cert_err50_cpp_violation] false positive - join is called to exit the previous thread // coverity[autosar_cpp14_a15_5_2_violation] false positive - join is called to exit the previous thread } void SomeipDataSource::pushXValue( const int32_t &val ) { mNamedSignalDataSource->ingestSignalValue( 0, "Vehicle.ExampleSomeipInterface.X", DecodedSignalValue{ val, SignalType::UINT32 } ); } void SomeipDataSource::pushTemperatureValue( const int32_t &val ) { mNamedSignalDataSource->ingestSignalValue( 0, "Vehicle.ExampleSomeipInterface.Temperature", DecodedSignalValue{ val, SignalType::INT32 } ); } void SomeipDataSource::pushA1Value( const v1::commonapi::CommonTypes::a1Struct &val ) { std::vector<std::pair<std::string, DecodedSignalValue>> values; values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A", DecodedSignalValue{ val.getA(), SignalType::BOOLEAN } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.B", DecodedSignalValue{ val.getB(), SignalType::INT32 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.C", DecodedSignalValue{ val.getC(), SignalType::DOUBLE } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.D", DecodedSignalValue{ val.getD(), SignalType::INT64 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.E", DecodedSignalValue{ val.getE(), SignalType::BOOLEAN } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.F", DecodedSignalValue{ val.getF(), SignalType::INT32 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.G", DecodedSignalValue{ val.getG(), SignalType::DOUBLE } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.H", DecodedSignalValue{ val.getH(), SignalType::INT64 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.I", DecodedSignalValue{ val.getI(), SignalType::BOOLEAN } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.J", DecodedSignalValue{ val.getJ(), SignalType::INT32 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.K", DecodedSignalValue{ val.getK(), SignalType::DOUBLE } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.L", DecodedSignalValue{ val.getL(), SignalType::INT64 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.A", DecodedSignalValue{ val.getA2().getA(), SignalType::INT32 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.B", DecodedSignalValue{ val.getA2().getB(), SignalType::BOOLEAN } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.D", DecodedSignalValue{ val.getA2().getD(), SignalType::DOUBLE } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.E", DecodedSignalValue{ val.getA2().getE(), SignalType::INT64 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.F", DecodedSignalValue{ val.getA2().getF(), SignalType::INT32 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.G", DecodedSignalValue{ val.getA2().getG(), SignalType::BOOLEAN } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.H", DecodedSignalValue{ val.getA2().getH(), SignalType::DOUBLE } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.I", DecodedSignalValue{ val.getA2().getI(), SignalType::INT64 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.J", DecodedSignalValue{ val.getA2().getJ(), SignalType::INT32 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.K", DecodedSignalValue{ val.getA2().getK(), SignalType::BOOLEAN } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.L", DecodedSignalValue{ val.getA2().getL(), SignalType::DOUBLE } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.M", DecodedSignalValue{ val.getA2().getM(), SignalType::INT64 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.N", DecodedSignalValue{ val.getA2().getN(), SignalType::INT32 } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.O", DecodedSignalValue{ val.getA2().getO(), SignalType::BOOLEAN } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.P", DecodedSignalValue{ val.getA2().getP(), SignalType::DOUBLE } ) ); values.emplace_back( std::make_pair( "Vehicle.ExampleSomeipInterface.A1.A2.Q", DecodedSignalValue{ val.getA2().getQ(), SignalType::INT64 } ) ); pushStringSignalToNamedDataSource( "Vehicle.ExampleSomeipInterface.A1.S", val.getS() ); mNamedSignalDataSource->ingestMultipleSignalValues( 0, values ); } void SomeipDataSource::pushStringSignalToNamedDataSource( const std::string &signalName, const std::string &stringValue ) { SignalID signalID = mNamedSignalDataSource->getNamedSignalID( signalName ); if ( signalID == INVALID_SIGNAL_ID ) { FWE_LOG_TRACE( "No decoding rules set for signal name " + signalName ); return; } if ( mRawDataBufferManager == nullptr ) { FWE_LOG_WARN( "Raw message id: " + std::to_string( signalID ) + " can not be handed over to RawBufferManager" ); return; } auto receiveTime = mClock->systemTimeSinceEpochMs(); std::vector<uint8_t> buffer( stringValue.begin(), stringValue.end() ); auto bufferHandle = mRawDataBufferManager->push( ( buffer.data() ), buffer.size(), receiveTime, signalID ); if ( bufferHandle == RawData::INVALID_BUFFER_HANDLE ) { FWE_LOG_WARN( "Raw message id: " + std::to_string( signalID ) + " was rejected by RawBufferManager" ); return; } // immediately set usage hint so buffer handle does not get directly deleted again mRawDataBufferManager->increaseHandleUsageHint( signalID, bufferHandle, RawData::BufferHandleUsageStage::COLLECTED_NOT_IN_HISTORY_BUFFER ); mNamedSignalDataSource->ingestSignalValue( receiveTime, signalName, DecodedSignalValue{ bufferHandle, SignalType::STRING } ); } bool SomeipDataSource::connect() { if ( !mExampleSomeipInterfaceWrapper->init() ) { return false; } mProxy = std::dynamic_pointer_cast<v1::commonapi::ExampleSomeipInterfaceProxy<>>( mExampleSomeipInterfaceWrapper->getProxy() ); mXSubscription = mProxy->getXAttribute().getChangedEvent().subscribe( [this]( const int32_t &val ) { std::lock_guard<std::mutex> lock( mLastValMutex ); mLastXVal = val; mLastXValAvailable = true; pushXValue( val ); } ); mTemperatureSubscription = mProxy->getTemperatureAttribute().getChangedEvent().subscribe( [this]( const int32_t &val ) { std::lock_guard<std::mutex> lock( mLastValMutex ); mLastTemperatureVal = val; mLastTemperatureValAvailable = true; pushTemperatureValue( val ); } ); mA1Subscription = mProxy->getA1Attribute().getChangedEvent().subscribe( [this]( const v1::commonapi::CommonTypes::a1Struct &val ) { std::lock_guard<std::mutex> lock( mLastValMutex ); mLastA1Val = val; mLastA1ValAvailable = true; pushA1Value( val ); } ); if ( mCyclicUpdatePeriodMs > 0 ) { mThread = std::thread( [this]() { Thread::setCurrentThreadName( "SomeipDataSource" ); while ( !mShouldStop ) { // If the proxy is available, push the last vals periodically: if ( !mProxy->isAvailable() ) { std::lock_guard<std::mutex> lock( mLastValMutex ); mLastXValAvailable = false; mLastTemperatureValAvailable = false; mLastA1ValAvailable = false; } else { std::lock_guard<std::mutex> lock( mLastValMutex ); if ( mLastXValAvailable ) { pushXValue( mLastXVal ); } if ( mLastTemperatureValAvailable ) { pushTemperatureValue( mLastTemperatureVal ); } if ( mLastA1ValAvailable ) { pushA1Value( mLastA1Val ); } } std::this_thread::sleep_for( std::chrono::milliseconds( mCyclicUpdatePeriodMs ) ); } } ); } FWE_LOG_INFO( "Successfully initialized" ); return true; } } // namespace IoTFleetWise } // namespace Aws