auto_sizing/logging/__init__.py (37 lines of code) (raw):

import logging from typing import Optional import attr import dask.distributed from distributed.diagnostics.plugin import WorkerPlugin from jetstream.logging import LOG_SOURCE from .bigquery_log_handler import BigQueryLogHandler @attr.s(auto_attribs=True) class LogConfiguration: """Configuration for setting up logging.""" log_project_id: Optional[str] log_dataset_id: Optional[str] log_table_id: Optional[str] log_to_bigquery: bool = False capacity: int = 50 log_source: str = LOG_SOURCE.SIZING def setup_logger(self, client=None): logging.basicConfig( level=logging.INFO, format="%(levelname)s:%(asctime)s:%(name)s:%(message)s", ) logger = logging.getLogger() if self.log_to_bigquery: bigquery_handler = BigQueryLogHandler( self.log_project_id, self.log_dataset_id, self.log_table_id, self.log_source, client, self.capacity, ) bigquery_handler.setLevel(logging.WARNING) logger.addHandler(bigquery_handler) class LogPlugin(WorkerPlugin): """ Dask worker plugin for initializing the logger. This ensures that the BigQuery logging handler gets initialized. """ def __init__(self, log_config: LogConfiguration): self.log_config = log_config def setup(self, worker: dask.distributed.Worker): self.log_config.setup_logger()