src/CollectionInspectionWorkerThread.cpp (355 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 #include "aws/iotfleetwise/CollectionInspectionWorkerThread.h" #include "aws/iotfleetwise/LoggingModule.h" #include "aws/iotfleetwise/QueueTypes.h" #include "aws/iotfleetwise/SignalTypes.h" #include "aws/iotfleetwise/TraceModule.h" #include <algorithm> #include <string> #include <utility> #include <vector> namespace Aws { namespace IoTFleetWise { CollectionInspectionWorkerThread::CollectionInspectionWorkerThread( CollectionInspectionEngine &collectionInspectionEngine, std::shared_ptr<SignalBuffer> inputSignalBuffer, std::shared_ptr<DataSenderQueue> outputCollectedData, uint32_t idleTimeMs, RawData::BufferManager *rawDataBufferManager ) : mCollectionInspectionEngine( collectionInspectionEngine ) , mInputSignalBuffer( std::move( inputSignalBuffer ) ) , mOutputCollectedData( std::move( outputCollectedData ) ) , mRawDataBufferManager( rawDataBufferManager ) { if ( idleTimeMs != 0 ) { mIdleTimeMs = idleTimeMs; } } bool CollectionInspectionWorkerThread::start() { if ( ( mInputSignalBuffer == nullptr ) || ( mOutputCollectedData == nullptr ) ) { FWE_LOG_ERROR( "Collection Engine cannot be started without correct configurations" ); return false; } // Prevent concurrent stop/init std::lock_guard<std::mutex> lock( mThreadMutex ); // On multi core systems the shared variable mShouldStop must be updated for // all cores before starting the thread otherwise thread will directly end mShouldStop.store( false ); if ( !mThread.create( [this]() { this->doWork(); } ) ) { FWE_LOG_TRACE( "Inspection Thread failed to start" ); } else { FWE_LOG_TRACE( "Inspection Thread started" ); mThread.setThreadName( "fwDICollInsEng" ); } return mThread.isActive() && mThread.isValid(); } bool CollectionInspectionWorkerThread::stop() { if ( ( !mThread.isValid() ) || ( !mThread.isActive() ) ) { return true; } std::lock_guard<std::mutex> lock( mThreadMutex ); mShouldStop.store( true, std::memory_order_relaxed ); FWE_LOG_TRACE( "Request stop" ); mWait.notify(); mThread.release(); FWE_LOG_TRACE( "Stop finished" ); mShouldStop.store( false, std::memory_order_relaxed ); return !mThread.isActive(); } bool CollectionInspectionWorkerThread::shouldStop() const { return mShouldStop.load( std::memory_order_relaxed ); } void CollectionInspectionWorkerThread::onChangeInspectionMatrix( std::shared_ptr<const InspectionMatrix> inspectionMatrix ) { std::lock_guard<std::mutex> lock( mInspectionMatrixMutex ); mUpdatedInspectionMatrix = inspectionMatrix; mUpdatedInspectionMatrixAvailable = true; FWE_LOG_TRACE( "New inspection matrix handed over" ); // Wake up the thread. mWait.notify(); } void CollectionInspectionWorkerThread::onNewDataAvailable() { mWait.notify(); } void CollectionInspectionWorkerThread::doWork() { TimePoint lastTimeEvaluated = { 0, 0 }; Timestamp lastTraceOutput = 0; uint32_t statisticInputMessagesProcessed = 0; uint32_t statisticDataSentOut = 0; uint32_t activations = 0; while ( true ) { activations++; if ( mUpdatedInspectionMatrixAvailable ) { std::shared_ptr<const InspectionMatrix> newInspectionMatrix; { std::lock_guard<std::mutex> lock( mInspectionMatrixMutex ); mUpdatedInspectionMatrixAvailable = false; newInspectionMatrix = mUpdatedInspectionMatrix; } mCollectionInspectionEngine.onChangeInspectionMatrix( newInspectionMatrix, mClock->timeSinceEpoch() ); } // Only run the main inspection loop if there is an inspection matrix // Otherwise, go to sleep. if ( mUpdatedInspectionMatrix ) { TimePoint currentTime = mClock->timeSinceEpoch(); uint32_t waitTimeMs = mIdleTimeMs; // Consume any new signals and pass them over to the inspection Engine auto consumeSignalGroups = [&]( const CollectedDataFrame &dataFrame ) { TraceModule::get().incrementVariable( TraceVariable::CE_PROCESSED_DATA_FRAMES ); if ( !dataFrame.mCollectedSignals.empty() ) { for ( auto &inputSignal : dataFrame.mCollectedSignals ) { TraceModule::get().decrementAtomicVariable( TraceAtomicVariable::QUEUE_CONSUMER_TO_INSPECTION_SIGNALS ); TraceModule::get().incrementVariable( TraceVariable::CE_PROCESSED_SIGNALS ); auto signalValue = inputSignal.getValue(); switch ( signalValue.getType() ) { case SignalType::UINT8: mCollectionInspectionEngine.addNewSignal<uint8_t>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.uint8Val ); break; case SignalType::INT8: mCollectionInspectionEngine.addNewSignal<int8_t>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.int8Val ); break; case SignalType::UINT16: mCollectionInspectionEngine.addNewSignal<uint16_t>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.uint16Val ); break; case SignalType::INT16: mCollectionInspectionEngine.addNewSignal<int16_t>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.int16Val ); break; case SignalType::UINT32: mCollectionInspectionEngine.addNewSignal<uint32_t>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.uint32Val ); break; case SignalType::INT32: mCollectionInspectionEngine.addNewSignal<int32_t>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.int32Val ); break; case SignalType::UINT64: mCollectionInspectionEngine.addNewSignal<uint64_t>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.uint64Val ); break; case SignalType::INT64: mCollectionInspectionEngine.addNewSignal<int64_t>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.int64Val ); break; case SignalType::FLOAT: mCollectionInspectionEngine.addNewSignal<float>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.floatVal ); break; case SignalType::DOUBLE: mCollectionInspectionEngine.addNewSignal<double>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.doubleVal ); break; case SignalType::BOOLEAN: mCollectionInspectionEngine.addNewSignal<bool>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.boolVal ); break; case SignalType::STRING: mCollectionInspectionEngine.addNewSignal<RawData::BufferHandle>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.uint32Val ); if ( mRawDataBufferManager != nullptr ) { mRawDataBufferManager->decreaseHandleUsageHint( inputSignal.signalID, signalValue.value.uint32Val, RawData::BufferHandleUsageStage::COLLECTED_NOT_IN_HISTORY_BUFFER ); } break; case SignalType::UNKNOWN: FWE_LOG_WARN( "UNKNOWN signal [signal id: " + std::to_string( inputSignal.signalID ) + " ] should not be processed" ); break; #ifdef FWE_FEATURE_VISION_SYSTEM_DATA case SignalType::COMPLEX_SIGNAL: mCollectionInspectionEngine.addNewSignal<RawData::BufferHandle>( inputSignal.signalID, inputSignal.fetchRequestID, timePointFromSystemTime( currentTime, inputSignal.receiveTime ), currentTime.monotonicTimeMs, signalValue.value.uint32Val ); if ( mRawDataBufferManager != nullptr ) { mRawDataBufferManager->decreaseHandleUsageHint( inputSignal.signalID, signalValue.value.uint32Val, RawData::BufferHandleUsageStage::COLLECTED_NOT_IN_HISTORY_BUFFER ); } break; #endif } statisticInputMessagesProcessed++; } } // Consume any Active DTCs // We could check if the DTCs have changed here, but not necessary // as we are looking at only the latest known DTCs. // We only pop one item from the Buffer for a reason : DTCs represent // the health of all ECUs in the network. The Inspection Engine does // not need to know that topology and thus counts on the OBD Module // to aggregate all DTCs from all ECUs in one single Item. if ( dataFrame.mActiveDTCs != nullptr ) { TraceModule::get().decrementAtomicVariable( TraceAtomicVariable::QUEUE_CONSUMER_TO_INSPECTION_DTCS ); TraceModule::get().incrementVariable( TraceVariable::CE_PROCESSED_DTCS ); mCollectionInspectionEngine.setActiveDTCs( *dataFrame.mActiveDTCs.get() ); statisticInputMessagesProcessed++; } lastTimeEvaluated = mClock->timeSinceEpoch(); mCollectionInspectionEngine.evaluateConditions( lastTimeEvaluated ); // Initiate data collection and upload after every condition evaluation statisticDataSentOut += collectDataAndUpload( waitTimeMs ); }; auto consumed = mInputSignalBuffer->consumeAll( consumeSignalGroups ); // If nothing was consumed and at least the evaluate interval has elapsed, evaluate the // conditions to check heartbeat campaigns: if ( ( consumed == 0 ) && ( ( mClock->monotonicTimeSinceEpochMs() - lastTimeEvaluated.monotonicTimeMs ) >= EVALUATE_INTERVAL_MS ) ) { lastTimeEvaluated = mClock->timeSinceEpoch(); mCollectionInspectionEngine.evaluateConditions( lastTimeEvaluated ); statisticDataSentOut += collectDataAndUpload( waitTimeMs ); } // Nothing is in the ring buffer to consume. Go to idle mode for some time. uint32_t timeToWait = std::min( waitTimeMs, mIdleTimeMs ); // Print only every THREAD_IDLE_TIME_MS to avoid console spam if ( mClock->monotonicTimeSinceEpochMs() > ( lastTraceOutput + LoggingModule::LOG_AGGREGATION_TIME_MS ) ) { FWE_LOG_TRACE( "Activations: " + std::to_string( activations ) + ". Waiting for some data to come. Idling for: " + std::to_string( timeToWait ) + " ms or until notify. Since last idling processed " + std::to_string( statisticInputMessagesProcessed ) + " incoming data packages and sent out " + std::to_string( statisticDataSentOut ) + " packages out" ); activations = 0; statisticInputMessagesProcessed = 0; statisticDataSentOut = 0; lastTraceOutput = mClock->monotonicTimeSinceEpochMs(); } mWait.wait( timeToWait ); } else { // No inspection Matrix available. Wait for it from the CollectionScheme manager mWait.wait( Signal::WaitWithPredicate ); } if ( shouldStop() ) { break; } } } uint32_t CollectionInspectionWorkerThread::collectDataAndUpload( uint32_t &waitTimeMs ) { uint32_t collectedDataPackages = 0; auto collectedData = this->mCollectionInspectionEngine.collectNextDataToSend( this->mClock->timeSinceEpoch(), waitTimeMs ); while ( ( ( collectedData.triggeredCollectionSchemeData != nullptr ) #ifdef FWE_FEATURE_VISION_SYSTEM_DATA || ( collectedData.triggeredVisionSystemData != nullptr ) #endif ) && ( !this->shouldStop() ) ) { TraceModule::get().incrementVariable( TraceVariable::CE_TRIGGERS ); if ( collectedData.triggeredCollectionSchemeData != nullptr ) { if ( this->mOutputCollectedData->push( collectedData.triggeredCollectionSchemeData ) ) { collectedDataPackages++; } else { FWE_LOG_WARN( "Collected data output buffer is full" ); } } #ifdef FWE_FEATURE_VISION_SYSTEM_DATA if ( collectedData.triggeredVisionSystemData != nullptr ) { if ( this->mOutputCollectedData->push( collectedData.triggeredVisionSystemData ) ) { collectedDataPackages++; } else { FWE_LOG_WARN( "Collected data output buffer is full, Vision System Data could not be pushed" ); } } #endif collectedData = this->mCollectionInspectionEngine.collectNextDataToSend( this->mClock->timeSinceEpoch(), waitTimeMs ); } return collectedDataPackages; } bool CollectionInspectionWorkerThread::isAlive() { return mThread.isValid() && mThread.isActive(); } CollectionInspectionWorkerThread::~CollectionInspectionWorkerThread() { // To make sure the thread stops during teardown of tests. if ( isAlive() ) { stop(); } } } // namespace IoTFleetWise } // namespace Aws