pipeline/common/memory.py (34 lines of code) (raw):
import gc
import logging
import os
from typing import Optional
import psutil
from pipeline.common import format_bytes
from pipeline.common.logging import get_logger
_memory_logger: Optional[logging.Logger] = None
_memory_process: Optional[psutil.Process] = None
_memory_last_bytes: Optional[int] = None
def get_memory_string() -> str:
"""
Get the current string representation of the memory usage.
"""
global _memory_process
global _memory_last_bytes
# Lazily initial everything.
if not _memory_process:
_memory_process = psutil.Process(os.getpid())
memory_info = _memory_process.memory_info()
bytes = memory_info.rss
if _memory_last_bytes:
bytes_diff = bytes - _memory_last_bytes
sign = ""
if bytes_diff >= 0:
sign = "+"
string = f"{format_bytes(bytes)} ({sign}{format_bytes(bytes_diff)})"
else:
string = format_bytes(bytes)
_memory_last_bytes = bytes
return string
def log_memory(gc_collect=False) -> None:
"""
Logs the memory usage of the current Python process.
Args:
- gc_collect - Perform a GC before measuring memory.
"""
global _memory_logger
if not _memory_logger:
_memory_logger = get_logger("memory")
if gc_collect:
gc.collect()
_memory_logger.info(get_memory_string())