ilm/ilm_lib/pipeline_util.py (128 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ==============================================================================
"""Pipeline utilities."""
import collections
import dataclasses
import logging
import time
from typing import Any, Iterable, Iterator, Optional, Tuple
from google.cloud import storage
from ilm import ilm_config
from ilm import ilm_types
def read_gcs_file(gcs_uri: str) -> str:
storage_client = storage.Client()
blob = storage.Blob.from_string(gcs_uri, storage_client)
return blob.download_as_text()
def write_gcs_file(
file_content: str,
gcs_uri: str,
storage_client: Optional[storage.Client] = None,
):
if storage_client is None:
storage_client = storage.Client()
blob = storage.Blob.from_string(gcs_uri, client=storage_client)
blob.upload_from_string(file_content)
def delete_gcs_file(
gcs_uri: str,
storage_client: Optional[storage.Client] = None,
):
if storage_client is None:
storage_client = storage.Client()
blob = storage.Blob.from_string(gcs_uri, client=storage_client)
blob.delete()
def read_ilm_config(
config_gcs_uri: str,
) -> ilm_config.ImageLifecycleManagementConfig:
"""Reads ILM pipeline config from GCS."""
config_str = read_gcs_file(config_gcs_uri)
try:
return ilm_config.ImageLifecycleManagementConfig.from_json(config_str)
except Exception as exp:
logging.error(
'Failed to read ILM config from %s with error: %s',
config_gcs_uri,
str(exp),
)
raise exp
def should_keep_instance(
instance: Tuple[str, Any],
ilm_cfg: ilm_config.ImageLifecycleManagementConfig,
) -> bool:
"""Whether to keep instance based on allowlist."""
return instance[0] not in ilm_cfg.instances_disallow_list
def _compute_cumulative_access_count(
log_access_metadata: ilm_types.LogAccessMetadata, num_frames: int
) -> ilm_types.AccessMetadata:
"""Computes cumulative access count including frames and instances counts."""
num_days_to_count = collections.defaultdict(float)
for access in log_access_metadata.frames_access_counts:
num_days_to_count[access.num_days] += access.count
# Divide frame counts by total number of frames.
for access in log_access_metadata.frames_access_counts:
num_days_to_count[access.num_days] /= num_frames
for access in log_access_metadata.instance_access_counts:
num_days_to_count[access.num_days] += access.count
cumulative_access_counts = []
total_count = 0.0
for num_days in sorted(num_days_to_count):
total_count += num_days_to_count[num_days]
cumulative_access_counts.append(
ilm_config.AccessCount(count=total_count, num_days=num_days)
)
return ilm_types.AccessMetadata(
cumulative_access_counts=cumulative_access_counts
)
def include_access_count_in_metadata(
merged_access_count_and_metadata: Tuple[
str,
Tuple[
Iterable[ilm_types.LogAccessMetadata],
Iterable[ilm_types.InstanceMetadata],
],
],
) -> Iterator[ilm_types.InstanceMetadata]:
"""Include access count in metadata.
Computes cumulative access counts from both instance and frames access counts.
Expects a single InstanceMetadata element, and zero or one AccessMetadata
element.
Args:
merged_access_count_and_metadata: tuple(instance, tuple(access, metadata).
Yields:
Instance metadata that includes access metadata.
Raises:
ValueError in case of unexpected access or instance metadata.
"""
instance, (log_access_metadata, instance_metadata) = (
merged_access_count_and_metadata
)
log_access_metadata = list(log_access_metadata)
if not log_access_metadata:
# Instance was not accessed
log_access_metadata = [ilm_types.LogAccessMetadata([], [])]
elif len(log_access_metadata) != 1:
raise ValueError(
f'Invalid access count for instance {instance}: {log_access_metadata}'
)
instance_metadata = list(instance_metadata)
if not instance_metadata:
logging.info(
'DICOM metadata missing for instance %s. Instance may have been '
'deleted. Skipping.',
instance,
)
return
if len(instance_metadata) != 1:
raise ValueError(
f'Invalid metadata for instance {instance}: {instance_metadata}'
)
instance_metadata = instance_metadata[0]
log_access_metadata = log_access_metadata[0]
if (
log_access_metadata.frames_access_counts
and not instance_metadata.num_frames
):
logging.warning(
'Instance %s with RetrieveFrames request is missing number of frames '
'in metadata. Assuming number of frames is 1.',
instance_metadata.instance,
)
instance_metadata = dataclasses.replace(instance_metadata, num_frames=1)
total_access_metadata = _compute_cumulative_access_count(
log_access_metadata, instance_metadata.num_frames
)
instance_metadata = dataclasses.replace(
instance_metadata, access_metadata=total_access_metadata
)
yield instance_metadata
class Throttler:
"""Basic throttler for keeping an approximate target QPS or below."""
def __init__(self, target_qps: float):
if target_qps <= 0:
raise ValueError('Target QPS must be positive.')
self._delta_between_requests = 1 / target_qps
self._last_request_time = time.time()
def wait(self):
delta_since_last_request = time.time() - self._last_request_time
if delta_since_last_request < self._delta_between_requests:
time.sleep(self._delta_between_requests - delta_since_last_request)
self._last_request_time = time.time()