Gems/AWSMetrics/Code/Source/MetricsQueue.cpp (178 lines of code) (raw):
/*
* Copyright (c) Contributors to the Open 3D Engine Project.
* For complete copyright and license terms please see the LICENSE at the root of this distribution.
*
* SPDX-License-Identifier: Apache-2.0 OR MIT
*
*/
#include <MetricsQueue.h>
#include <Framework/JsonWriter.h>
#include <AzCore/JSON/prettywriter.h>
#include <AzCore/JSON/stringbuffer.h>
#include <AzCore/IO/FileIO.h>
#include <AzCore/IO/Path/Path.h>
#include <AzCore/Serialization/Json/JsonSerialization.h>
#include <AzCore/Serialization/Json/JsonUtils.h>
#include <AzCore/std/sort.h>
#include <sstream>
namespace AWSMetrics
{
const MetricsEvent& MetricsQueue::operator[](int index) const
{
AZ_Assert(index >= 0 && index < m_metrics.size(), "Index for the metrics queue is out of range.");
return m_metrics[index];
}
void MetricsQueue::AddMetrics(const MetricsEvent& metrics)
{
m_sizeSerializedToJson += metrics.GetSizeInBytes();
m_metrics.emplace_back(AZStd::move(metrics));
}
void MetricsQueue::AppendMetrics(MetricsQueue& metricsQueue)
{
if (metricsQueue.GetNumMetrics() == 0)
{
return;
}
m_sizeSerializedToJson += metricsQueue.GetSizeInBytes();
if (m_metrics.size() == 0)
{
m_metrics = AZStd::move(metricsQueue.m_metrics);
}
else
{
AZStd::move(metricsQueue.m_metrics.begin(), metricsQueue.m_metrics.end(), std::back_inserter(m_metrics));
}
}
void MetricsQueue::PushMetricsToFront(MetricsQueue& metricsQueue)
{
if (metricsQueue.GetNumMetrics() == 0)
{
return;
}
m_sizeSerializedToJson += metricsQueue.GetSizeInBytes();
if (m_metrics.size() == 0)
{
m_metrics = AZStd::move(metricsQueue.m_metrics);
}
else
{
AZStd::move(metricsQueue.m_metrics.begin(), metricsQueue.m_metrics.end(), std::front_inserter(m_metrics));
}
}
int MetricsQueue::FilterMetricsByPriority(size_t maxSizeInBytes)
{
if (GetSizeInBytes() < maxSizeInBytes)
{
return 0;
}
int numCurrentMetricsEvents = GetNumMetrics();
AZStd::vector<AZStd::pair<const MetricsEvent*, int>> sortedMetrics;
for (int index = 0; index < numCurrentMetricsEvents; ++index)
{
sortedMetrics.emplace_back(AZStd::pair<const MetricsEvent*, int>(&m_metrics[index], index));
}
// Sort the existing metrics event by event priority.
// We need to reverse the relative order of the metrics events with the same priority to
// avoid newer events being discarded when the max size capacity is reached.
AZStd::stable_sort(sortedMetrics.begin(), sortedMetrics.end(),
[](const AZStd::pair<const MetricsEvent*, int>& left, const AZStd::pair<const MetricsEvent*, int>& right)
{
if (left.first->GetEventPriority() == right.first->GetEventPriority())
{
return left.second > right.second;
}
else
{
return left.first->GetEventPriority() < right.first->GetEventPriority();
}
}
);
AZStd::deque<MetricsEvent> result;
m_sizeSerializedToJson = 0;
for (const AZStd::pair<const MetricsEvent*, int>& pair : sortedMetrics)
{
if (m_sizeSerializedToJson < maxSizeInBytes)
{
m_sizeSerializedToJson += pair.first->GetSizeInBytes();
result.emplace_back(AZStd::move(*(pair.first)));
}
else
{
break;
}
}
m_metrics.clear();
m_metrics = AZStd::move(result);
return numCurrentMetricsEvents - GetNumMetrics();
}
void MetricsQueue::ClearMetrics()
{
m_sizeSerializedToJson = 0;
m_metrics.clear();
}
int MetricsQueue::GetNumMetrics() const
{
return static_cast<int>(m_metrics.size());
}
size_t MetricsQueue::GetSizeInBytes() const
{
return m_sizeSerializedToJson;
}
AZStd::string MetricsQueue::SerializeToJson()
{
std::stringstream stringStream;
AWSCore::JsonOutputStream jsonStream{stringStream};
AWSCore::JsonWriter writer{jsonStream};
SerializeToJson(writer);
return stringStream.str().c_str();
}
bool MetricsQueue::SerializeToJson(AWSCore::JsonWriter& writer) const
{
bool ok = true;
ok = ok && writer.StartArray();
for (auto& metrics : m_metrics)
{
ok = ok && metrics.SerializeToJson(writer);
}
ok = ok && writer.EndArray();
return ok;
}
void MetricsQueue::PopBufferedEventsByServiceLimits(MetricsQueue& bufferedEvents, int maxPayloadSizeInBytes, int maxBatchedRecordsCount)
{
int curNum = 0;
int curSizeInBytes = 0;
while (m_metrics.size() > 0)
{
MetricsEvent& curEvent = m_metrics.front();
curNum += 1;
curSizeInBytes += static_cast<int>(curEvent.GetSizeInBytes());
if (curNum <= maxBatchedRecordsCount && curSizeInBytes <= maxPayloadSizeInBytes)
{
m_sizeSerializedToJson -= curEvent.GetSizeInBytes();
bufferedEvents.AddMetrics(curEvent);
m_metrics.pop_front();
}
else
{
break;
}
}
}
bool MetricsQueue::ReadFromJson(const AZStd::string& filePath)
{
auto result = AZ::JsonSerializationUtils::ReadJsonFile(filePath);
if (!result.IsSuccess() ||!ReadFromJsonDocument(result.GetValue()))
{
AZ_Error("AWSMetrics", false, "Failed to read metrics file %s", filePath.c_str());
return false;
}
return true;
}
bool MetricsQueue::ReadFromJsonDocument(rapidjson::Document& doc)
{
if (!doc.IsArray())
{
return false;
}
for (rapidjson::SizeType metricsIndex = 0; metricsIndex < doc.Size(); metricsIndex++)
{
MetricsEvent metrics;
if (!metrics.ReadFromJson(doc[metricsIndex]))
{
return false;
}
// Read through each element in the array and add it as a new metrics
AddMetrics(metrics);
}
return true;
}
}