pipeline/common/logging.py (80 lines of code) (raw):
import logging
from pathlib import Path
import subprocess
import threading
import time
logging.basicConfig(level=logging.INFO, format="[%(name)s] %(message)s")
STOP_BYTE_COUNT_LOGGER = False
STOP_GPU_LOGGER = False
def get_logger(name: str):
"""
Get a logger using the __file__ name.
For example in pipeline/bicleaner/download_pack.py
logger = get_logger(__file__)
logger.info("This is a log.")
Will log:
> [download_pack] This is a log.
"""
logger = logging.getLogger(Path(name).stem)
logger.setLevel(logging.INFO)
return logger
def _log_gpu_stats(logger: logging.Logger, interval_seconds: int):
# Only load gpustat when it's needed.
import gpustat
global STOP_GPU_LOGGER
while True:
time.sleep(interval_seconds)
if STOP_GPU_LOGGER:
STOP_GPU_LOGGER = False
return
try:
logger.info("[gpu] Current GPU stats:")
gpustat.print_gpustat()
except subprocess.CalledProcessError as e:
logger.error(f"Failed to retrieve GPU stats: {e}")
def stop_gpu_logging():
global STOP_GPU_LOGGER
STOP_GPU_LOGGER = True
def start_gpu_logging(logger: logging.Logger, interval_seconds: int):
"""Logs GPU stats on an interval using gpustat in a background thread."""
assert not STOP_GPU_LOGGER, "A gpu logger should not already be running"
thread = threading.Thread(
target=_log_gpu_stats,
# Set as a daemon thread so it automatically is closed on shutdown.
daemon=True,
args=(logger, interval_seconds),
)
thread.start()
def _log_byte_rate(logger: logging.Logger, interval_seconds: int, file_path: Path):
global STOP_BYTE_COUNT_LOGGER
previous_byte_count = 0
previous_time = time.time()
is_zst = file_path.suffix == ".zst"
while True:
time.sleep(interval_seconds)
if STOP_BYTE_COUNT_LOGGER:
STOP_BYTE_COUNT_LOGGER = False
return
try:
if is_zst:
# This takes ~1 second to run on 5 million sentences.
current_byte_count = 0
cmd = ["zstd", "-dc", str(file_path)]
with subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
) as process:
for chunk in iter(lambda: process.stdout.read(8192), b""):
current_byte_count += len(chunk)
else:
# This is pretty much instantaneous.
result = subprocess.run(
["wc", "-c", str(file_path)], capture_output=True, text=True, check=True
)
current_byte_count = int(result.stdout.split()[0])
bytes_added = current_byte_count - previous_byte_count
current_secs = time.time()
elapsed_secs = current_secs - previous_time
byte_rate = bytes_added / elapsed_secs if bytes_added > 0 else 0
logger.info(f"[bytes] Added: {bytes_added:,}")
logger.info(f"[bytes] Total: {current_byte_count:,}")
logger.info(f"[bytes] Rate: {byte_rate:,.2f} bytes/second")
previous_byte_count = current_byte_count
previous_time = time.time()
except Exception as e:
logger.error(f"Failed to monitor byte count: {e}")
def stop_byte_count_logger():
global STOP_BYTE_COUNT_LOGGER
STOP_BYTE_COUNT_LOGGER = True
def start_byte_count_logger(logger: logging.Logger, interval_seconds: int, file_path: Path):
"""
Monitors the rate of bytes being added to a file, logging the number of bytes
added per second over the interval.
"""
assert not STOP_BYTE_COUNT_LOGGER, "A line count logger should not already be running"
thread = threading.Thread(
target=_log_byte_rate, args=(logger, interval_seconds, file_path), daemon=True
)
thread.start()