ssiog/metrics_logger.py (52 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import csv
import time
import queue
from queue import Queue
from threading import Thread
class AsyncMetricsLogger:
"""
A class to asynchronously log metrics to a CSV file.
"""
def __init__(self, file_name="metrics.csv", flush_interval=5):
self.file_name = file_name
self.flush_interval = flush_interval
self.queue = Queue()
self._shutdown = False
self.writer_thread = Thread(target=self._writer_loop, daemon=True)
self.writer_thread.start()
def _writer_loop(self):
with open(self.file_name, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["timestamp", "sample_lat"])
while True:
try:
metrics = []
while True:
metrics.append(self.queue.get_nowait())
except queue.Empty:
pass
if metrics:
writer.writerows(metrics)
csvfile.flush()
# Get all remaining metrics from the queue if shutdown.
if self._shutdown:
try:
metrics = []
while True:
metrics.append(self.queue.get_nowait())
except queue.Empty:
pass
if metrics:
writer.writerows(metrics)
csvfile.flush()
break
time.sleep(self.flush_interval)
def log_metric(self, sample_lat):
"""
Logs a metric data point asynchronously.
"""
timestamp = time.time()
self.queue.put([timestamp, sample_lat])
def close(self):
"""
Signals the writer thread to shut down and flushes any remaining metrics.
"""
self._shutdown = True # Set the shutdown flag
self.writer_thread.join() # Wait for the thread to finish
class NoOpMetricsLogger:
"""
A no-op metrics logger that mimics the behavior of a real metrics logger
but doesn't actually record any metrics. Useful for testing or
disabling metrics without code changes.
"""
def __init__(self, file_name="metrics.csv"):
pass # Ignore any arguments
def log_metric(self, sample_lat):
pass # Do nothing when log_metric is called
def close(self):
pass # Do nothing when close is called