Gems/AWSMetrics/Code/Source/MetricsManager.cpp (359 lines of code) (raw):

/* * Copyright (c) Contributors to the Open 3D Engine Project. * For complete copyright and license terms please see the LICENSE at the root of this distribution. * * SPDX-License-Identifier: Apache-2.0 OR MIT * */ #include <AWSMetricsBus.h> #include <AWSMetricsConstant.h> #include <AWSMetricsServiceApi.h> #include <ClientConfiguration.h> #include <DefaultClientIdProvider.h> #include <MetricsEvent.h> #include <MetricsEventBuilder.h> #include <MetricsManager.h> #include <AzCore/Jobs/JobFunction.h> #include <AzCore/IO/FileIO.h> #include <AzCore/Math/MathUtils.h> #include <AzCore/std/smart_ptr/make_shared.h> namespace AWSMetrics { MetricsManager::MetricsManager() : m_clientConfiguration(AZStd::make_unique<ClientConfiguration>()) , m_clientIdProvider(IdentityProvider::CreateIdentityProvider()) , m_monitorTerminated(true) , m_sendMetricsId(0) { } MetricsManager::~MetricsManager() { ShutdownMetrics(); } bool MetricsManager::Init() { if (!m_clientConfiguration->InitClientConfiguration()) { return false; } SetupJobContext(); return true; } void MetricsManager::StartMetrics() { if (!m_monitorTerminated) { // The background thread has been started. return; } m_monitorTerminated = false; // Start a separate thread to monitor and consume the metrics queue. // Avoid using the job system since the worker is long-running over multiple frames m_monitorThread = AZStd::thread(AZStd::bind(&MetricsManager::MonitorMetricsQueue, this)); } void MetricsManager::MonitorMetricsQueue() { // Continue to loop until the monitor is terminated. while (!m_monitorTerminated) { // The thread will wake up either when the metrics event queue is full (try_acquire_for call returns true), // or the flush period limit is hit (try_acquire_for call returns false). m_waitEvent.try_acquire_for(AZStd::chrono::seconds(m_clientConfiguration->GetQueueFlushPeriodInSeconds())); FlushMetricsAsync(); } } void MetricsManager::SetupJobContext() { // Avoid using the default job context since we will do blocking IO instead of CPU/memory intensive work unsigned int numWorkerThreads = AZ::GetMin(DesiredMaxWorkers, AZStd::thread::hardware_concurrency()); AZ::JobManagerDesc jobDesc; AZ::JobManagerThreadDesc threadDesc; for (unsigned int i = 0; i < numWorkerThreads; ++i) { jobDesc.m_workerThreads.push_back(threadDesc); } m_jobManager.reset(aznew AZ::JobManager{ jobDesc }); m_jobContext.reset(aznew AZ::JobContext{ *m_jobManager }); } bool MetricsManager::SubmitMetrics(const AZStd::vector<MetricsAttribute>& metricsAttributes, int eventPriority, const AZStd::string& eventSourceOverride) { MetricsEvent metricsEvent = MetricsEventBuilder(). AddDefaultMetricsAttributes(m_clientIdProvider->GetIdentifier(), eventSourceOverride). AddMetricsAttributes(metricsAttributes). SetMetricsPriority(eventPriority). Build(); if (!metricsEvent.ValidateAgainstSchema()) { m_globalStats.m_numDropped++; return false; } AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex); m_metricsQueue.AddMetrics(metricsEvent); if (m_metricsQueue.GetSizeInBytes() >= static_cast<size_t>(m_clientConfiguration->GetMaxQueueSizeInBytes())) { // Flush the metrics queue when the accumulated metrics size hits the limit m_waitEvent.release(); } return true; } bool MetricsManager::SendMetricsAsync(const AZStd::vector<MetricsAttribute>& metricsAttributes, int eventPriority, const AZStd::string & eventSourceOverride) { MetricsEvent metricsEvent = MetricsEventBuilder(). AddDefaultMetricsAttributes(m_clientIdProvider->GetIdentifier(), eventSourceOverride). AddMetricsAttributes(metricsAttributes). SetMetricsPriority(eventPriority). Build(); if (!metricsEvent.ValidateAgainstSchema()) { m_globalStats.m_numDropped++; return false; } auto metricsToFlush = AZStd::make_shared<MetricsQueue>(); metricsToFlush->AddMetrics(metricsEvent); SendMetricsAsync(metricsToFlush); return true; } void MetricsManager::SendMetricsAsync(AZStd::shared_ptr<MetricsQueue> metricsQueue) { if (m_clientConfiguration->OfflineRecordingEnabled()) { SendMetricsToLocalFileAsync(metricsQueue); } else { // Constant used to convert size limit from MB to Bytes. static constexpr int MbToBytes = 1000000; while (metricsQueue->GetNumMetrics() > 0) { // Break the metrics queue by the payload and records count limits. Make one or more service API requests to send all the buffered metrics. MetricsQueue metricsEventsToProcess; metricsQueue->PopBufferedEventsByServiceLimits(metricsEventsToProcess, AwsMetricsMaxRestApiPayloadSizeInMb * MbToBytes, AwsMetricsMaxKinesisBatchedRecordCount); SendMetricsToServiceApiAsync(metricsEventsToProcess); } } } void MetricsManager::SendMetricsToLocalFileAsync(AZStd::shared_ptr<MetricsQueue> metricsQueue) { int requestId = ++m_sendMetricsId; // Send metrics to a local file AZ::Job* job{nullptr}; job = AZ::CreateJobFunction( [this, metricsQueue, requestId]() { AZ::Outcome<void, AZStd::string> outcome = SendMetricsToFile(metricsQueue); if (outcome.IsSuccess()) { // Generate response records for success call to keep consistency with the Service API response ServiceAPI::PostMetricsEventsResponseEntries responseEntries; int numMetricsEventsInRequest = metricsQueue->GetNumMetrics(); for (int index = 0; index < numMetricsEventsInRequest; ++index) { ServiceAPI::PostMetricsEventsResponseEntry responseEntry; responseEntry.m_result = AwsMetricsPostMetricsEventsResponseEntrySuccessResult; responseEntries.emplace_back(responseEntry); } OnResponseReceived(*metricsQueue, responseEntries); AZ::TickBus::QueueFunction([requestId]() { AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsSuccess, requestId); }); } else { OnResponseReceived(*metricsQueue); AZStd::string errorMessage = outcome.GetError(); AZ::TickBus::QueueFunction([requestId, errorMessage]() { AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsFailure, requestId, errorMessage); }); } }, true, m_jobContext.get()); job->Start(); } void MetricsManager::SendMetricsToServiceApiAsync(const MetricsQueue& metricsQueue) { int requestId = ++m_sendMetricsId; ServiceAPI::PostMetricsEventsRequestJob* requestJob = ServiceAPI::PostMetricsEventsRequestJob::Create( [this, requestId](ServiceAPI::PostMetricsEventsRequestJob* successJob) { OnResponseReceived(successJob->parameters.m_metricsQueue, successJob->result.m_responseEntries); AZ::TickBus::QueueFunction([requestId]() { AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsSuccess, requestId); }); }, [this, requestId](ServiceAPI::PostMetricsEventsRequestJob* failedJob) { OnResponseReceived(failedJob->parameters.m_metricsQueue); AZStd::string errorMessage = failedJob->error.message; AZ::TickBus::QueueFunction([requestId, errorMessage]() { AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsFailure, requestId, errorMessage); }); }); requestJob->parameters.m_metricsQueue = AZStd::move(metricsQueue); requestJob->Start(); } void MetricsManager::OnResponseReceived(const MetricsQueue& metricsEventsInRequest, const ServiceAPI::PostMetricsEventsResponseEntries& responseEntries) { MetricsQueue metricsEventsForRetry; int numMetricsEventsInRequest = metricsEventsInRequest.GetNumMetrics(); for (int index = 0; index < numMetricsEventsInRequest; ++index) { MetricsEvent metricsEvent = metricsEventsInRequest[index]; if (responseEntries.size() > 0 && responseEntries[index].m_result == AwsMetricsPostMetricsEventsResponseEntrySuccessResult) { // The metrics event is sent to the backend successfully. if (metricsEvent.GetNumFailures() == 0) { m_globalStats.m_numEvents++; } else { // Reduce the number of errors when the retry succeeds. m_globalStats.m_numErrors--; } m_globalStats.m_numSuccesses++; m_globalStats.m_sendSizeInBytes += static_cast<uint32_t>(metricsEvent.GetSizeInBytes()); } else { metricsEvent.MarkFailedSubmission(); // The metrics event failed to be sent to the backend for the first time. if (metricsEvent.GetNumFailures() == 1) { m_globalStats.m_numErrors++; m_globalStats.m_numEvents++; } if (metricsEvent.GetNumFailures() <= m_clientConfiguration->GetMaxNumRetries()) { metricsEventsForRetry.AddMetrics(metricsEvent); } else { m_globalStats.m_numDropped++; } } } PushMetricsForRetry(metricsEventsForRetry); } void MetricsManager::PushMetricsForRetry(MetricsQueue& metricsEventsForRetry) { if (m_clientConfiguration->GetMaxNumRetries() == 0) { // No retry is required. m_globalStats.m_numDropped += metricsEventsForRetry.GetNumMetrics(); return; } // Push failed events to the front of the queue and reserve the order. AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex); m_metricsQueue.PushMetricsToFront(metricsEventsForRetry); // Filter metrics events by priority since the queue might be full. m_globalStats.m_numDropped += m_metricsQueue.FilterMetricsByPriority(m_clientConfiguration->GetMaxQueueSizeInBytes()); } AZ::Outcome<void, AZStd::string> MetricsManager::SendMetricsToFile(AZStd::shared_ptr<MetricsQueue> metricsQueue) { AZStd::lock_guard<AZStd::mutex> lock(m_metricsFileMutex); AZ::IO::FileIOBase* fileIO = AZ::IO::FileIOBase::GetDirectInstance(); if (!fileIO) { return AZ::Failure(AZStd::string{ "No FileIoBase Instance." }); } const char* metricsFileFullPath = m_clientConfiguration->GetMetricsFileFullPath(); const char* metricsFileDir = m_clientConfiguration->GetMetricsFileDir(); MetricsQueue existingMetricsEvents; if (!metricsFileFullPath || !metricsFileDir) { return AZ::Failure(AZStd::string{ "Failed to get the metrics file directory or path." }); } if (fileIO->Exists(metricsFileFullPath) && !existingMetricsEvents.ReadFromJson(metricsFileFullPath)) { return AZ::Failure(AZStd::string{ "Failed to read the existing metrics on disk" }); } else if (!fileIO->Exists(metricsFileDir) && !fileIO->CreatePath(metricsFileDir)) { return AZ::Failure(AZStd::string{ "Failed to create metrics directory" }); } // Append a copy of the metrics queue in the request to the existing metrics events and keep the original submission order. // Do not modify the metrics queue in the request directly for identifying the metrics events for retry on failure. MetricsQueue metricsEventsInRequest = *metricsQueue; existingMetricsEvents.AppendMetrics(metricsEventsInRequest); AZStd::string serializedMetrics = existingMetricsEvents.SerializeToJson(); AZ::IO::HandleType fileHandle; if (!fileIO->Open(metricsFileFullPath, AZ::IO::OpenMode::ModeWrite | AZ::IO::OpenMode::ModeText, fileHandle)) { return AZ::Failure(AZStd::string{ "Failed to open metrics file" }); } fileIO->Write(fileHandle, serializedMetrics.c_str(), serializedMetrics.size()); fileIO->Close(fileHandle); return AZ::Success(); } void MetricsManager::FlushMetricsAsync() { AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex); if (m_metricsQueue.GetNumMetrics() == 0) { return; } auto metricsToFlush = AZStd::make_shared<MetricsQueue>(); metricsToFlush->AppendMetrics(m_metricsQueue); m_metricsQueue.ClearMetrics(); SendMetricsAsync(metricsToFlush); } void MetricsManager::ShutdownMetrics() { if (m_monitorTerminated) { return; } // Terminate the monitor thread m_monitorTerminated = true; m_waitEvent.release(); if (m_monitorThread.joinable()) { m_monitorThread.join(); } } AZ::s64 MetricsManager::GetNumBufferedMetrics() { AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex); return m_metricsQueue.GetNumMetrics(); } const GlobalStatistics& MetricsManager::GetGlobalStatistics() const { return m_globalStats; } void MetricsManager::UpdateOfflineRecordingStatus(bool enable, bool submitLocalMetrics) { m_clientConfiguration->UpdateOfflineRecordingStatus(enable); if (!enable && submitLocalMetrics) { SubmitLocalMetricsAsync(); } } void MetricsManager::SubmitLocalMetricsAsync() { AZ::Job* job{ nullptr }; job = AZ::CreateJobFunction([this]() { AZ::IO::FileIOBase* fileIO = AZ::IO::FileIOBase::GetDirectInstance(); if (!fileIO) { AZ_Error("AWSMetrics", false, "No FileIoBase Instance."); return; } // Read metrics from the local metrics file. AZStd::lock_guard<AZStd::mutex> file_lock(m_metricsFileMutex); if (!fileIO->Exists(GetMetricsFilePath())) { // Local metrics file doesn't exist. return; } MetricsQueue offlineRecords; if (!offlineRecords.ReadFromJson(GetMetricsFilePath())) { AZ_Error("AWSMetrics", false, "Failed to read from the local metrics file %s", GetMetricsFilePath()); return; } // Submit the metrics read from the local metrics file. int numOfflineRecords = offlineRecords.GetNumMetrics(); for (int index = 0; index < numOfflineRecords; ++index) { AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex); m_metricsQueue.AddMetrics(offlineRecords[index]); if (m_metricsQueue.GetSizeInBytes() >= static_cast<size_t>(m_clientConfiguration->GetMaxQueueSizeInBytes())) { // Flush the metrics queue when the accumulated metrics size hits the limit m_waitEvent.release(); } } // Remove the local metrics file after reading all its content. if (!fileIO->Remove(GetMetricsFilePath())) { AZ_Error("AWSMetrics", false, "Failed to remove the local metrics file %s", GetMetricsFilePath()); return; } }, true, m_jobContext.get()); job->Start(); } const char* MetricsManager::GetMetricsFileDirectory() const { return m_clientConfiguration->GetMetricsFileDir(); } const char* MetricsManager::GetMetricsFilePath() const { return m_clientConfiguration->GetMetricsFileFullPath(); } int MetricsManager::GetNumTotalRequests() const { return m_sendMetricsId.load(); } }