auto_sizing/logging/bigquery_log_handler.py (55 lines of code) (raw):
import datetime
from logging.handlers import BufferingHandler
from typing import Optional
from google.cloud import bigquery
class BigQueryLogHandler(BufferingHandler):
"""Custom logging handler for writing logs to BigQuery."""
def __init__(
self,
project_id: str,
dataset_id: str,
table_id: str,
source: str,
client: Optional[bigquery.Client] = None,
capacity=50,
):
self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.client = client
if client is None:
self.client = bigquery.Client(project_id)
self.source = source
super().__init__(capacity)
def _buffer_to_json(self, buffer):
"""Converts the records in the buffer to JSON."""
return [
{
"timestamp": datetime.datetime.fromtimestamp(record.created).strftime(
"%Y-%m-%d %H:%M:%S"
),
"source": self.source if not hasattr(record, "source") else record.source,
"experiment": None if not hasattr(record, "experiment") else record.experiment,
"metric": None if not hasattr(record, "metric") else record.metric,
"statistic": None if not hasattr(record, "statistic") else record.statistic,
"message": record.getMessage(),
"log_level": record.levelname,
"exception": str(record.exc_info),
"filename": record.filename,
"func_name": record.funcName,
"exception_type": None if not record.exc_info else record.exc_info[0].__name__,
}
for record in buffer
]
def flush(self):
"""
Override default flushing behaviour.
Write the buffer to BigQuery.
"""
self.acquire()
try:
if self.buffer:
destination_table = f"{self.project_id}.{self.dataset_id}.{self.table_id}"
self.client.load_table_from_json(
self._buffer_to_json(self.buffer), destination_table
).result()
self.buffer = []
except Exception as e:
print(f"Exception while flushing logs: {e}")
pass
finally:
self.release()