in src/main/java/org/opensearch/performanceanalyzer/writer/EventLogQueueProcessor.java [116:188]
public void purgeQueueAndPersist() {
// Drain the Queue, and if writer is enabled then persist to event log file.
if (PerformanceAnalyzerConfigAction.getInstance() == null) {
return;
} else if (!controller.isPerformanceAnalyzerEnabled()) {
// If PA is disabled, then we return as we don't want to generate
// new files. But we also want to drain the queue so that when it is
// enabled next, we don't have the current elements as they would be
// old.
if (PerformanceAnalyzerMetrics.metricQueue.size() > 0) {
List<Event> metrics = new ArrayList<>();
PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics);
LOG.info(
"Performance Analyzer no longer enabled. Drained the"
+ "queue to remove stale data.");
}
return;
}
LOG.debug("Starting to purge the queue.");
List<Event> metrics = new ArrayList<>();
PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics);
LOG.debug("Queue draining successful.");
long currentTimeMillis = System.currentTimeMillis();
// Calculate the timestamp on the file. For example, lets say the
// purging started at time 12.5 then all the events between 5-10
// are written to a file with name 5.
long timeBucket =
PerformanceAnalyzerMetrics.getTimeInterval(
currentTimeMillis, MetricsConfiguration.SAMPLING_INTERVAL)
- MetricsConfiguration.SAMPLING_INTERVAL;
// When we are trying to collect the metrics for the 5th-10th second,
// but doing that in the 12.5th second, there is a chance that a
// collector ran in the 11th second and pushed the metrics in the
// queue. This thread, should be able to filter them and write them
// to their appropriate file, which should be 10 and not 5.
long nextTimeBucket = timeBucket + MetricsConfiguration.SAMPLING_INTERVAL;
List<Event> currMetrics = new ArrayList<>();
List<Event> nextMetrics = new ArrayList<>();
for (Event entry : metrics) {
if (entry.epoch == timeBucket) {
currMetrics.add(entry);
} else if (entry.epoch == nextTimeBucket) {
nextMetrics.add(entry);
} else {
// increment stale_metrics count when metrics to be collected is falling behind the
// current bucket
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.STALE_METRICS, "", 1);
}
}
LOG.debug("Start serializing and writing to file.");
writeAndRotate(currMetrics, timeBucket, currentTimeMillis);
if (!nextMetrics.isEmpty()) {
// The next bucket metrics don't need to be considered for
// rotation just yet. So, we just write them to the
// <nextTimeBucket>.tmp
eventLogFileHandler.writeTmpFile(nextMetrics, nextTimeBucket);
}
LOG.debug("Writing to disk complete.");
// Delete the older event log files every filesCleanupPeriod (defaults to 60)
// In case files deletion takes longer/fails, we are okay with eventQueue reaching
// its max size (100000), post that {@link PerformanceAnalyzerMetrics#emitMetric()}
// will emit metric {@link WriterMetrics#METRICS_WRITE_ERROR} and return.
cleanup();
}