ilm/ilm_lib/logs_lib.py (194 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ==============================================================================
"""Data access logs utilities for ILM."""
import collections
import datetime
import enum
import json
import logging
from typing import Any, Iterable, Iterator, List, Mapping, Tuple
import apache_beam as beam
from ilm import ilm_config
from ilm import ilm_types
from ilm.ilm_lib import dicom_store_lib
from ilm.ilm_lib import pipeline_util
# Audit log entry columns
_DICOMWEB_SERVICE_METHOD = 'protopayload_auditlog.methodName'
_DICOM_STORE_PATH = 'protopayload_auditlog.resourceName'
_REQUEST_TIMESTAMP = (
'protopayload_auditlog.requestMetadata.requestAttributes.time'
)
_REQUEST_JSON = 'protopayload_auditlog.requestJson'
_STATUS_CODE = 'protopayload_auditlog.status.code'
# Audit log query result keywords
_DICOMWEB_SERVICE_METHOD_KEY = 'methodName'
_REQUEST_TIMESTAMP_KEY = 'time'
_REQUEST_JSON_KEY = 'requestJson'
_REQUEST_JSON_DICOMWEB_PATH_KEY = 'dicomWebPath'
class AuditLogEntryParsingError(Exception):
pass
class DicomWebServiceMethods(enum.Enum):
"""DICOM Web Service API methods for accessing instances in DICOM Store."""
RETRIEVE_INSTANCE = 'RetrieveInstance'
RETRIEVE_RENDERED_INSTANCE = 'RetrieveRenderedInstance'
RETRIEVE_FRAMES = 'RetrieveFrames'
RETRIEVE_RENDERED_FRAMES = 'RetrieveRenderedFrames'
RETRIEVE_STUDY = 'RetrieveStudy'
RETRIEVE_SERIES = 'RetrieveSeries'
# TODO: Adapt query to fetch #days in current storage class.
def get_data_access_logs_query(
ilm_cfg: ilm_config.ImageLifecycleManagementConfig,
):
"""Generates query to fetch DICOM instances access requests in data logs."""
methods = [
'REGEXP_CONTAINS('
' protoPayload_auditlog.methodName, '
f' "^(google.cloud.healthcare.(v1|v1beta1).dicomweb.DicomWebService.{method.value})$"'
')'
for method in DicomWebServiceMethods
]
method_conditions = ' OR '.join(methods)
if ilm_cfg.logs_config.log_entries_date_equal_or_after:
date_condition = (
f' AND {_REQUEST_TIMESTAMP} > PARSE_TIMESTAMP("%Y%m%d",'
f' "{ilm_cfg.logs_config.log_entries_date_equal_or_after}")'
)
else:
date_condition = ''
return (
'SELECT '
f' {_DICOMWEB_SERVICE_METHOD}, {_DICOM_STORE_PATH}, '
f' {_REQUEST_TIMESTAMP}, {_REQUEST_JSON} '
f'FROM `{ilm_cfg.logs_config.logs_bigquery_table}` '
'WHERE '
f' ({method_conditions}) '
f' AND {_STATUS_CODE} IS NULL' # Ignore requests with error status.
' AND'
f' {_DICOM_STORE_PATH}="{ilm_cfg.dicom_store_config.dicom_store_path}"'
f' {date_condition}'
)
class ParseDataAccessLogsDoFn(beam.DoFn):
"""DoFn to parse relevant metadata in data access logs."""
def __init__(self, ilm_cfg: ilm_config.ImageLifecycleManagementConfig):
self._cfg = ilm_cfg
self._today = datetime.datetime.now(tz=datetime.timezone.utc)
self._dicom_store_client = dicom_store_lib.DicomStoreClient(self._cfg)
self._throttler = pipeline_util.Throttler(
self._cfg.logs_config.max_dicom_store_qps
)
def setup(self):
self._dicom_store_client = dicom_store_lib.DicomStoreClient(self._cfg)
self._throttler = pipeline_util.Throttler(
self._cfg.logs_config.max_dicom_store_qps
)
def _cleanup_dicomweb_path(self, dicomweb_path: str) -> Tuple[str, int]:
"""Cleans up DICOMweb path and counts # of requested frames, if present.
Args:
dicomweb_path: request DICOMweb path, which may or may not contain
{series, instances, frames, rendered} parts:
studies/<>/series/<>/instances/<>/frames/<>/rendered
Returns:
DICOMweb path without /frames* or /rendered suffix, and number of frames
if present.
"""
dicomweb_path = dicomweb_path.removesuffix('/rendered')
if 'frames' not in dicomweb_path:
return dicomweb_path, 0
dicomweb_path, frames = dicomweb_path.split('/frames/')
return dicomweb_path, len(frames.split(','))
def _instance_access_count(
self,
dicomweb_service_method: str,
dicomweb_path: str,
access_num_days: int,
) -> List[Tuple[str, ilm_types.LogAccessCount]]:
dicomweb_path, request_num_frames = self._cleanup_dicomweb_path(
dicomweb_path
)
if dicomweb_service_method.endswith(
DicomWebServiceMethods.RETRIEVE_INSTANCE.value
) or dicomweb_service_method.endswith(
DicomWebServiceMethods.RETRIEVE_RENDERED_INSTANCE.value
):
log_access_count = ilm_types.LogAccessCount(
frames_access_count=None,
instance_access_count=ilm_config.AccessCount(
num_days=access_num_days, count=1.0
),
)
return [(dicomweb_path, log_access_count)]
if dicomweb_service_method.endswith(
DicomWebServiceMethods.RETRIEVE_STUDY.value
) or dicomweb_service_method.endswith(
DicomWebServiceMethods.RETRIEVE_SERIES.value
):
try:
self._throttler.wait()
instances = self._dicom_store_client.fetch_instances(dicomweb_path)
except dicom_store_lib.DicomStoreError as e:
logging.warning(
'Unable to fetch instances for %s: %s. Skipping.', dicomweb_path, e
)
return []
log_access_count = ilm_types.LogAccessCount(
frames_access_count=None,
instance_access_count=ilm_config.AccessCount(
num_days=access_num_days, count=1.0
),
)
return [(instance, log_access_count) for instance in instances]
if dicomweb_service_method.endswith(
DicomWebServiceMethods.RETRIEVE_FRAMES.value
) or dicomweb_service_method.endswith(
DicomWebServiceMethods.RETRIEVE_RENDERED_FRAMES.value
):
log_access_count = ilm_types.LogAccessCount(
frames_access_count=ilm_config.AccessCount(
num_days=access_num_days, count=request_num_frames
),
instance_access_count=None,
)
return [(dicomweb_path, log_access_count)]
return []
def process(
self, raw_log_entry: Mapping[str, Any]
) -> Iterator[Tuple[str, ilm_types.LogAccessCount]]:
try:
log_date = raw_log_entry[_REQUEST_TIMESTAMP_KEY]
dicomweb_service_method = raw_log_entry[_DICOMWEB_SERVICE_METHOD_KEY]
request_json = json.loads(raw_log_entry[_REQUEST_JSON_KEY])
dicomweb_path = request_json[_REQUEST_JSON_DICOMWEB_PATH_KEY]
except (KeyError, json.decoder.JSONDecodeError, TypeError) as exp:
logging.warning(
'Parsing of log entry %s failed with error: %s. Skipping.',
raw_log_entry,
exp,
)
return
log_date = log_date.astimezone(datetime.timezone.utc)
access_num_days = max((self._today - log_date).days, 0)
instance_to_access_count = self._instance_access_count(
dicomweb_service_method, dicomweb_path, access_num_days
)
for instance, log_access_count in instance_to_access_count:
if instance in self._cfg.instances_disallow_list:
continue
yield instance, log_access_count
def _merge_same_day_access_counts(
access_counts: List[ilm_config.AccessCount],
) -> List[ilm_config.AccessCount]:
num_days_to_count = collections.defaultdict(float)
for access in access_counts:
num_days_to_count[access.num_days] += access.count
return [
ilm_config.AccessCount(num_days=num_days, count=count)
for num_days, count in num_days_to_count.items()
]
def compute_log_access_metadata(
instance_to_access_counts: Tuple[str, Iterable[ilm_types.LogAccessCount]],
) -> Tuple[str, ilm_types.LogAccessMetadata]:
"""Computes log access metadata, merging same day access counts."""
instance, log_access_counts = instance_to_access_counts
log_access_counts = list(log_access_counts)
frames_access_counts = [
a.frames_access_count
for a in log_access_counts
if a.frames_access_count is not None
]
instance_access_counts = [
a.instance_access_count
for a in log_access_counts
if a.instance_access_count is not None
]
return (
instance,
ilm_types.LogAccessMetadata(
frames_access_counts=_merge_same_day_access_counts(
frames_access_counts
),
instance_access_counts=_merge_same_day_access_counts(
instance_access_counts
),
),
)