src/RemoteProfiler.cpp (254 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#include "aws/iotfleetwise/RemoteProfiler.h"
#include "aws/iotfleetwise/IConnectionTypes.h"
#include "aws/iotfleetwise/LoggingModule.h"
#include "aws/iotfleetwise/TopicConfig.h"
#include "aws/iotfleetwise/TraceModule.h"
#include <algorithm>
#include <limits>
#include <utility>
#include <vector>
namespace Aws
{
namespace IoTFleetWise
{
RemoteProfiler::RemoteProfiler( ISender &sender,
uint32_t initialMetricsUploadInterval,
uint32_t initialLogMaxInterval,
LogLevel initialLogLevelThresholdToSend,
std::string profilerPrefix )
: fShouldStop( false )
, mMqttSender( sender )
, fCurrentMetricsPending( 0 )
, fInitialUploadInterval( initialMetricsUploadInterval )
, fInitialLogMaxInterval( initialLogMaxInterval )
, fLastTimeMetricsSentOut( 0 )
, fLastTimeMLogsSentOut( 0 )
, fLastTimeExecutionEnvironmentMetricsCollected( fClock->monotonicTimeSinceEpochMs() )
, fLogLevelThreshold( initialLogLevelThresholdToSend )
, fProfilerPrefix( std::move( profilerPrefix ) )
, fCurrentUserPayloadInLogRoot( 0 )
{
initLogStructure();
fLastCPURUsage.reportCPUUsageInfo();
CPUUsageInfo::reportPerThreadUsageData( fLastThreadUsage );
}
void
RemoteProfiler::initLogStructure()
{
fLogRoot.clear();
fLogRoot[NAME_TOP_LEVEL_LOG_PREFIX] = fProfilerPrefix;
fLogRoot[NAME_TOP_LEVEL_LOG_ARRAY] = Json::arrayValue;
fCurrentUserPayloadInLogRoot = 0;
}
void
RemoteProfiler::sendMetricsOut()
{
Json::StreamWriterBuilder builder;
builder["indentation"] = ""; // If you want whitespace-less output
const std::string output = Json::writeString( builder, fMetricsRoot );
mMqttSender.sendBuffer( mMqttSender.getTopicConfig().metricsTopic,
reinterpret_cast<const uint8_t *>( output.c_str() ),
output.length(),
[]( ConnectivityError result ) {
if ( result == ConnectivityError::Success )
{
FWE_LOG_ERROR( "Send error " + std::to_string( static_cast<uint32_t>( result ) ) );
}
} );
fMetricsRoot.clear();
fCurrentMetricsPending = 0;
}
void
RemoteProfiler::sendLogsOut()
{
if ( fCurrentUserPayloadInLogRoot > 0 )
{
std::string output;
{
// No logging in this area as this will deadlock
std::lock_guard<std::mutex> lock( loggingMutex );
Json::StreamWriterBuilder builder;
builder["indentation"] = ""; // If you want whitespace-less output
output = Json::writeString( builder, fLogRoot );
initLogStructure();
}
mMqttSender.sendBuffer( mMqttSender.getTopicConfig().logsTopic,
reinterpret_cast<const uint8_t *>( output.c_str() ),
output.length(),
[]( ConnectivityError result ) {
if ( result == ConnectivityError::Success )
{
FWE_LOG_ERROR( "Send error " +
std::to_string( static_cast<uint32_t>( result ) ) );
}
} );
}
}
void
RemoteProfiler::logMessage( LogLevel level,
const std::string &filename,
const uint32_t lineNumber,
const std::string &function,
const std::string &logEntry )
{
if ( level < fLogLevelThreshold )
{
return;
}
Json::Value logNode;
const std::string timestamp = fClock->currentTimeToIsoString();
logNode["logLevel"] = levelToString( level );
logNode["logFile"] = filename;
logNode["logLineNumber"] = lineNumber;
logNode["logFunction"] = function;
logNode["logEntry"] = logEntry;
logNode["logTimestamp"] = timestamp;
uint32_t size = static_cast<uint32_t>( filename.length() + function.length() + logEntry.length() +
timestamp.length() + JSON_MAX_OVERHEAD_BYTES_PER_LOG );
bool sendOutBeforeAdding = false;
{
std::lock_guard<std::mutex> lock( loggingMutex );
if ( size + fCurrentUserPayloadInLogRoot > MAX_BYTES_FOR_SINGLE_LOG_UPLOAD )
{
sendOutBeforeAdding = true;
}
else
{
fLogRoot[NAME_TOP_LEVEL_LOG_ARRAY].append( logNode );
fCurrentUserPayloadInLogRoot += size;
}
}
if ( sendOutBeforeAdding )
{
sendLogsOut();
{
std::lock_guard<std::mutex> lock( loggingMutex );
fLogRoot[NAME_TOP_LEVEL_LOG_ARRAY].append( logNode );
fCurrentUserPayloadInLogRoot += size;
}
}
}
void
RemoteProfiler::flush()
{
sendLogsOut();
}
void
RemoteProfiler::setMetric( const std::string &name, double value, const std::string &unit )
{
if ( fCurrentMetricsPending > MAX_PARALLEL_METRICS )
{
sendMetricsOut();
}
fCurrentMetricsPending++;
Json::Value metric;
metric["name"] = fProfilerPrefix + "_" + name;
metric["value"] = Json::Value( value );
metric["unit"] = unit;
fMetricsRoot["metric" + std::to_string( fCurrentMetricsPending )] = metric;
}
bool
RemoteProfiler::start()
{
if ( ( ( fInitialLogMaxInterval == 0 ) && ( fLogLevelThreshold != LogLevel::Off ) ) ||
( ( fInitialLogMaxInterval != 0 ) && ( fLogLevelThreshold == LogLevel::Off ) ) )
{
FWE_LOG_WARN( "Logging is turned off by putting LogLevel Threshold to Off but log max interval is not "
"0, which is implausible" );
}
// Prevent concurrent stop/init
std::lock_guard<std::mutex> lock( fThreadMutex );
// On multi core systems the shared variable fShouldStop must be updated for
// all cores before starting the thread otherwise thread will directly end
fShouldStop.store( false );
if ( !fThread.create( [this]() {
this->doWork();
} ) )
{
FWE_LOG_TRACE( "Remote Profiler Thread failed to start" );
}
else
{
FWE_LOG_TRACE( "Remote Profiler Thread started" );
fThread.setThreadName( "fwCNProfiler" );
}
return fThread.isActive() && fThread.isValid();
}
bool
RemoteProfiler::stop()
{
if ( ( !fThread.isValid() ) || ( !fThread.isActive() ) )
{
return true;
}
std::lock_guard<std::mutex> lock( fThreadMutex );
fShouldStop.store( true, std::memory_order_relaxed );
FWE_LOG_TRACE( "Request stop" );
fWait.notify();
fThread.release();
initLogStructure();
fMetricsRoot.clear();
FWE_LOG_TRACE( "Stop finished" );
fShouldStop.store( false, std::memory_order_relaxed );
return !fThread.isActive();
}
void
RemoteProfiler::collectExecutionEnvironmentMetrics()
{
CPUUsageInfo lastUsage = fLastCPURUsage;
fLastCPURUsage.reportCPUUsageInfo();
Timestamp currentTime = fClock->monotonicTimeSinceEpochMs();
double secondsBetweenCollection =
static_cast<double>( currentTime - fLastTimeExecutionEnvironmentMetricsCollected ) / 1000.0;
fLastTimeExecutionEnvironmentMetricsCollected = currentTime;
double totalCPUPercentage = fLastCPURUsage.getCPUPercentage( lastUsage, secondsBetweenCollection );
fMemoryUsage.reportMemoryUsageInfo();
setMetric( "MemoryMaxResidentRam", static_cast<double>( fMemoryUsage.getMaxResidentMemorySize() ), "Bytes" );
setMetric( "MemoryCurrentResidentRam", static_cast<double>( fMemoryUsage.getResidentMemorySize() ), "Bytes" );
setMetric( "CpuPercentageSum", totalCPUPercentage, "Percent" );
CPUUsageInfo::ThreadCPUUsageInfos threadStatsPrevious = fLastThreadUsage;
CPUUsageInfo::reportPerThreadUsageData( fLastThreadUsage );
for ( auto currentThreadCPUUsageInfo : fLastThreadUsage )
{
for ( auto previousThreadCPUUsageInfo : threadStatsPrevious )
{
if ( currentThreadCPUUsageInfo.threadId == previousThreadCPUUsageInfo.threadId )
{
setMetric(
std::string( "CpuThread_" ) + currentThreadCPUUsageInfo.threadName + "_" +
std::to_string( currentThreadCPUUsageInfo.threadId ),
currentThreadCPUUsageInfo.getCPUPercentage( previousThreadCPUUsageInfo, secondsBetweenCollection ),
"Percent" );
}
}
}
}
void
RemoteProfiler::doWork()
{
while ( !fShouldStop )
{
if ( ( fInitialUploadInterval == 0 ) && ( fInitialLogMaxInterval == 0 ) )
{
fWait.wait( Signal::WaitWithPredicate );
}
else
{
fWait.wait( static_cast<uint32_t>( std::min(
fInitialUploadInterval == 0 ? std::numeric_limits<uint32_t>::max() : fInitialUploadInterval,
fInitialLogMaxInterval == 0 ? std::numeric_limits<uint32_t>::max() : fInitialLogMaxInterval ) ) );
}
Timestamp currentTime = fClock->monotonicTimeSinceEpochMs();
if ( fShouldStop || ( ( fLastTimeMetricsSentOut + fInitialUploadInterval ) < currentTime ) )
{
fLastTimeMetricsSentOut = currentTime;
TraceModule::get().forwardAllMetricsToMetricsReceiver( this );
TraceModule::get().startNewObservationWindow( fInitialUploadInterval );
collectExecutionEnvironmentMetrics();
sendMetricsOut();
}
if ( fShouldStop || ( ( fLastTimeMLogsSentOut + fInitialLogMaxInterval ) < currentTime ) )
{
fLastTimeMLogsSentOut = currentTime;
sendLogsOut();
}
}
}
} // namespace IoTFleetWise
} // namespace Aws