public void purgeQueueAndPersist()

in src/main/java/org/opensearch/performanceanalyzer/writer/EventLogQueueProcessor.java [116:188]


    public void purgeQueueAndPersist() {
        // Drain the Queue, and if writer is enabled then persist to event log file.
        if (PerformanceAnalyzerConfigAction.getInstance() == null) {
            return;
        } else if (!controller.isPerformanceAnalyzerEnabled()) {
            // If PA is disabled, then we return as we don't want to generate
            // new files. But we also want to drain the queue so that when it is
            // enabled next, we don't have the current elements as they would be
            // old.
            if (PerformanceAnalyzerMetrics.metricQueue.size() > 0) {
                List<Event> metrics = new ArrayList<>();
                PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics);
                LOG.info(
                        "Performance Analyzer no longer enabled. Drained the"
                                + "queue to remove stale data.");
            }
            return;
        }

        LOG.debug("Starting to purge the queue.");
        List<Event> metrics = new ArrayList<>();
        PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics);
        LOG.debug("Queue draining successful.");

        long currentTimeMillis = System.currentTimeMillis();

        // Calculate the timestamp on the file. For example, lets say the
        // purging started at time 12.5 then all the events between 5-10
        // are written to a file with name 5.
        long timeBucket =
                PerformanceAnalyzerMetrics.getTimeInterval(
                                currentTimeMillis, MetricsConfiguration.SAMPLING_INTERVAL)
                        - MetricsConfiguration.SAMPLING_INTERVAL;

        // When we are trying to collect the metrics for the 5th-10th second,
        // but doing that in the 12.5th second, there is a chance that a
        // collector ran in the 11th second and pushed the metrics in the
        // queue. This thread, should be able to filter them and write them
        // to their appropriate file, which should be 10 and not 5.
        long nextTimeBucket = timeBucket + MetricsConfiguration.SAMPLING_INTERVAL;

        List<Event> currMetrics = new ArrayList<>();
        List<Event> nextMetrics = new ArrayList<>();

        for (Event entry : metrics) {
            if (entry.epoch == timeBucket) {
                currMetrics.add(entry);
            } else if (entry.epoch == nextTimeBucket) {
                nextMetrics.add(entry);
            } else {
                // increment stale_metrics count when metrics to be collected is falling behind the
                // current bucket
                PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
                        WriterMetrics.STALE_METRICS, "", 1);
            }
        }

        LOG.debug("Start serializing and writing to file.");
        writeAndRotate(currMetrics, timeBucket, currentTimeMillis);
        if (!nextMetrics.isEmpty()) {
            // The next bucket metrics don't need to be considered for
            // rotation just yet. So, we just write them to the
            // <nextTimeBucket>.tmp
            eventLogFileHandler.writeTmpFile(nextMetrics, nextTimeBucket);
        }
        LOG.debug("Writing to disk complete.");

        // Delete the older event log files every filesCleanupPeriod (defaults to 60)
        // In case files deletion takes longer/fails, we are okay with eventQueue reaching
        // its max size (100000), post that {@link PerformanceAnalyzerMetrics#emitMetric()}
        // will emit metric {@link WriterMetrics#METRICS_WRITE_ERROR} and return.
        cleanup();
    }