logging/import-logs/main.py (145 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # pylint: disable=missing-module-docstring # pylint: disable=broad-exception-caught from datetime import date, datetime, timedelta import json import math import os import sys from typing import List, Tuple, TypedDict from google.api_core import exceptions from google.cloud import logging_v2, storage # Logging limits (https://cloud.google.com/logging/quotas#api-limits) _LOGS_MAX_SIZE_BYTES = 9 * 1024 * 1024 # < 10MB # Read Cloud Run environment variables TASK_INDEX = int(os.getenv("CLOUD_RUN_TASK_INDEX", "0")) TASK_COUNT = int(os.getenv("CLOUD_RUN_TASK_COUNT", "1")) def getenv_date(name: str) -> date: """Reads environment variable and converts it to 'datetime.date'""" date_str = os.getenv(name) if not date_str: return None return datetime.strptime(date_str, "%m/%d/%Y").date() # Read import parameters' environment variables START_DATE = getenv_date("START_DATE") END_DATE = getenv_date("END_DATE") LOG_ID = os.getenv("LOG_ID") BUCKET_NAME = os.getenv("STORAGE_BUCKET_NAME") PROJECT_ID = os.getenv("PROJECT_ID") def eprint(*objects: str, **kwargs: TypedDict) -> None: """Prints objects to stderr""" print(*objects, file=sys.stderr, **kwargs) def _day(blob_name: str) -> int: """Parse day number from Blob's path Use the known Blob path convention to parse the day part from the path. The path convention is <LOG_ID>/YYYY/MM/DD/<OBJECT_NAME> """ # calculated in function to allow test to set LOG_ID offset = len(LOG_ID) + 1 + 4 + 1 + 2 + 1 return int(blob_name[offset : offset + 2]) def _is_valid_import_range() -> bool: """Validate the import date range Checks the import range dates to ensure that - start date is earlier than end date - no dates in the range are older than 29 days due to https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#FIELDS.timestamp """ if START_DATE > END_DATE: eprint("Start date of the import time range should be earlier than end date") return False # comment the following 3 lines if import range includes dates older than 29 days from now if (date.today() - START_DATE).days > 29: eprint("Import range includes dates older than 29 days from today.") return False return True def calc_import_range() -> Tuple[date, date]: """Calculate import range for the task based on full import range and number of tasks""" if TASK_COUNT == 1: return START_DATE, END_DATE diff = END_DATE - START_DATE if diff.days > TASK_COUNT: shard_days = math.floor(diff.days / TASK_COUNT) else: shard_days = 1 # start day is next day after prev. task end day start_date = START_DATE + timedelta((shard_days + 1) * TASK_INDEX) # when no more tasks required return (deterministic) negative range if start_date > END_DATE: return END_DATE, START_DATE if TASK_INDEX < (TASK_COUNT - 1): end_date = start_date + timedelta(shard_days) else: end_date = END_DATE return start_date, end_date def _prefix(_date: date) -> str: return f"{LOG_ID}/{_date.year:04}/{_date.month:02}/" def list_log_files(first_day: date, last_day: date, client: storage.Client) -> List: """Load paths to all log files stored in Cloud Storage in between first and last days. For log organization hierarchy see https://cloud.google.com/logging/docs/export/storage#gcs-organization. """ paths = [] # collect paths for special case when first and last days are in the same month if first_day.year == last_day.year and first_day.month == last_day.month: blobs = client.list_blobs( BUCKET_NAME, prefix=_prefix(first_day), delimiter=None ) paths = [ b.name for b in blobs if _day(b.name) >= first_day.day and _day(b.name) <= last_day.day ] return paths # collect all log file paths in first month and filter those for early days blobs = client.list_blobs(BUCKET_NAME, prefix=_prefix(first_day), delimiter=None) paths.extend([b.name for b in blobs if _day(b.name) >= first_day.day]) # process all paths in last months blobs = client.list_blobs(BUCKET_NAME, prefix=_prefix(last_day)) paths.extend([b.name for b in blobs if _day(b.name) <= last_day.day]) # process all paths in between for year in range(first_day.year, last_day.year + 1): for month in range( first_day.month + 1 if year == first_day.year else 1, last_day.month if year == last_day.year else 13, ): blobs = client.list_blobs( BUCKET_NAME, prefix=_prefix(date(year=year, month=month, day=1)) ) paths.extend([b.name for b in blobs]) return paths def _read_logs(path: str, bucket: storage.Bucket) -> List[str]: blob = bucket.blob(path) contents = blob.download_as_string() return contents.splitlines() def _write_logs(logs: List[dict], client: logging_v2.Client) -> None: try: client.logging_api.write_entries(logs) except exceptions.PermissionDenied as err2: for detail in err2.details: if isinstance(detail, logging_v2.types.WriteLogEntriesPartialErrors): # partialerrors.log_entry_errors is a dictionary # keyed by the logs' zero-based index in the logs. # consider implementing custom error handling eprint(f"{detail}") raise def _patch_entry(log: dict, project_id: str) -> None: """Modify entry fields to allow importing entry to destination project. Save logName as a user label. Replace logName with the fixed value "projects/PROJECT_ID/logs/imported_logs". Rename the obsolete key "serviceData" with "metadata". """ log_name = log.get("logName") labels = log.get("labels") log["logName"] = f"projects/{project_id}/logs/imported_logs" if not labels: labels = dict() log["labels"] = labels labels["original_logName"] = log_name # TODO: remove after the following issue is fixed: # https://github.com/googleapis/python-logging/issues/945 if "protoPayload" in log: payload = log.get("protoPayload") if "serviceData" in payload: # the following line changes the place of metadata in the dictionary payload["metadata"] = payload.pop("serviceData") # uncomment the following 2 lines if import range includes dates older than 29 days from now # labels["original_timestamp"] = log["timestamp"] # log["timestamp"] = None def import_logs( log_files: List, storage_client: storage.Client, logging_client: logging_v2.Client ) -> None: """Iterates through log files to write log entries in batched mode""" total_size, logs = 0, [] bucket = storage_client.bucket(BUCKET_NAME) for file_path in log_files: data = _read_logs(file_path, bucket) for entry in data: log = json.loads(entry) _patch_entry(log, logging_client.project) size = sys.getsizeof(log) if total_size + size >= _LOGS_MAX_SIZE_BYTES: _write_logs(logs, logging_client) total_size, logs = 0, [] total_size += size logs.append(log) if logs: _write_logs(logs, logging_client) def main() -> None: """Imports logs from Cloud Storage to Cloud Logging""" if not START_DATE or not END_DATE or not LOG_ID or not BUCKET_NAME: eprint("Missing some of required parameters") sys.exit(1) if not _is_valid_import_range(): sys.exit(1) start_date, end_date = calc_import_range() if start_date > end_date: print(f"Task #{(TASK_INDEX+1)} has no work to do") sys.exit(0) print( f"Task #{(TASK_INDEX+1)} starts importing logs from {start_date} to {end_date}" ) storage_client = storage.Client() log_files = list_log_files(start_date, end_date, storage_client) logging_client = ( logging_v2.Client(project=PROJECT_ID) if PROJECT_ID else logging_v2.Client() ) import_logs(log_files, storage_client, logging_client) # Start script if __name__ == "__main__": try: main() except Exception as err: eprint(f"Task #{TASK_INDEX+1}, failed: {err}") sys.exit(1)