runmetrics/stats_collector.py (121 lines of code) (raw):
import gc
import logging
import multiprocessing as mp
import os
import platform
import resource
import sys
import threading
import time
from typing import Dict, Optional, Union
from spectator.meter.gauge import Gauge
from spectator.meter.monotonic_counter import MonotonicCounter
from spectator.registry import Registry
try:
import psutil
except ImportError:
pass
class StatsCollector:
def __init__(self, registry: Registry, worker_id: Optional[str] = None, period: int = 30):
self._registry = registry
self._logger = logging.getLogger(__name__)
self._enabled = True
self._period = period
self._worker_id = worker_id
# file descriptor metrics
self._fd_allocated = self._registry.gauge("py.fd", self._with_worker_id({"id": "allocated"}))
self._fd_max = self._registry.gauge("py.fd", self._with_worker_id({"id": "max"}))
# garbage collector metrics
self._gc_enabled = self._registry.gauge("py.gc.enabled", self._with_worker_id({}))
self._gc_gen: Dict[int, Dict[str, Union[Gauge, MonotonicCounter]]] = {}
for i in range(0, 3):
self._gc_gen[i] = {}
self._gc_gen[i]["collections"] = self._registry.monotonic_counter("py.gc.collections", self._with_worker_id({"gen": f"{i}"}))
self._gc_gen[i]["collected"] = self._registry.monotonic_counter("py.gc.collected", self._with_worker_id({"gen": f"{i}"}))
self._gc_gen[i]["uncollectable"] = self._registry.monotonic_counter("py.gc.uncollectable", self._with_worker_id({"gen": f"{i}"}))
self._gc_gen[i]["threshold"] = self._registry.gauge("py.gc.threshold", self._with_worker_id({"gen": f"{i}"}))
self._gc_gen[i]["count"] = self._registry.gauge("py.gc.count", self._with_worker_id({"gen": f"{i}"}))
self._gc_pause = self._registry.timer("py.gc.pause", self._with_worker_id({}))
self._gc_time_since_last = self._registry.age_gauge("py.gc.timeSinceLast", self._with_worker_id({}))
if platform.python_implementation() == "CPython":
gc.callbacks.append(self._gc_callback)
# multiprocessing and os metrics
self._mp_active_children = self._registry.gauge("py.mp.activeChildren", self._with_worker_id({}))
self._os_cpu = self._registry.gauge("py.os.cpu", self._with_worker_id({}))
# resource usage metrics
self._ru_utime = self._registry.monotonic_counter("py.resource.time", self._with_worker_id({"mode": "user"}))
self._ru_stime = self._registry.monotonic_counter("py.resource.time", self._with_worker_id({"mode": "system"}))
self._ru_maxrss = self._registry.gauge("py.resource.maxResidentSetSize", self._with_worker_id({}))
self._ru_minflt = self._registry.monotonic_counter("py.resource.pageFaults", self._with_worker_id({"io.required": "false"}))
self._ru_majflt = self._registry.monotonic_counter("py.resource.pageFaults", self._with_worker_id({"io.required": "true"}))
self._ru_inblock = self._registry.monotonic_counter("py.resource.blockOperations", self._with_worker_id({"id": "input"}))
self._ru_oublock = self._registry.monotonic_counter("py.resource.blockOperations", self._with_worker_id({"id": "output"}))
self._ru_nvcsw = self._registry.monotonic_counter("py.resource.contextSwitches", self._with_worker_id({"id": "voluntary"}))
self._ru_nivcsw = self._registry.monotonic_counter("py.resource.contextSwitches", self._with_worker_id({"id": "involuntary"}))
# threading metrics
self._threading_active = self._registry.gauge("py.threading.active", self._with_worker_id({}))
def _with_worker_id(self, tags: Dict[str, str]) -> Optional[Dict[str, str]]:
if self._worker_id is not None:
tags.update({"worker.id": f"{self._worker_id}"})
if len(tags) == 0:
return None
else:
return tags
def _gc_callback(self, phase: str, info: Dict[str, int]) -> None:
if phase == "start":
self._gc_time_since_last.now()
self._gc_start = time.monotonic()
else:
self._gc_pause.record(time.monotonic() - self._gc_start)
def _target(self):
self._logger.info("start collecting runtime metrics every %s seconds", self._period)
while self._enabled:
self._logger.debug("collect runtime metrics")
self.collect_stats()
time.sleep(self._period)
self._logger.info("stop collecting runtime metrics")
def collect_stats(self):
self._collect_fd_stats()
self._collect_gc_stats()
self._collect_mp_os_stats()
self._collect_resource_stats()
self._collect_threading_stats()
def _collect_fd_stats(self):
self._fd_max.set(resource.RLIMIT_NOFILE)
if sys.platform == "win32":
self._fd_allocated.set(psutil.Process().num_handles())
elif sys.platform == "darwin":
self._fd_allocated.set(len(os.listdir("/dev/fd")))
elif sys.platform == "linux":
self._fd_allocated.set(len(os.listdir("/proc/self/fd")))
def _collect_gc_stats(self):
if gc.isenabled():
self._gc_enabled.set(1)
else:
self._gc_enabled.set(0)
stats = gc.get_stats()
threshold = gc.get_threshold()
count = gc.get_count()
for i in range(0, 3):
self._gc_gen[i]["collections"].set(stats[i]["collections"])
self._gc_gen[i]["collected"].set(stats[i]["collected"])
self._gc_gen[i]["uncollectable"].set(stats[i]["uncollectable"])
self._gc_gen[i]["threshold"].set(threshold[i])
self._gc_gen[i]["count"].set(count[i])
def _collect_mp_os_stats(self):
self._mp_active_children.set(len(mp.active_children()))
self._os_cpu.set(os.cpu_count())
def _collect_resource_stats(self):
rusage = resource.getrusage(resource.RUSAGE_SELF)
self._ru_utime.set(rusage.ru_utime)
self._ru_stime.set(rusage.ru_stime)
self._ru_maxrss.set(rusage.ru_maxrss)
self._ru_minflt.set(rusage.ru_minflt)
self._ru_majflt.set(rusage.ru_majflt)
self._ru_inblock.set(rusage.ru_inblock)
self._ru_oublock.set(rusage.ru_oublock)
self._ru_nvcsw.set(rusage.ru_nvcsw)
self._ru_nivcsw.set(rusage.ru_nivcsw)
def _collect_threading_stats(self):
self._threading_active.set(threading.active_count())
def start(self):
self._enabled = True
t = threading.Thread(target=self._target, daemon=True)
t.start()
def stop(self):
self._enabled = False