ilm/ilm_config.py (237 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ==============================================================================
"""Image Lifecycle Management config."""
import dataclasses
import enum
from typing import List, Optional, Set
import dataclasses_json
class InvalidConfigError(Exception):
pass
class StorageClass(enum.Enum):
STANDARD = 'STANDARD'
NEARLINE = 'NEARLINE'
COLDLINE = 'COLDLINE'
ARCHIVE = 'ARCHIVE'
_STORAGE_CLASS_AVAILABILITY = {
StorageClass.STANDARD: 3,
StorageClass.NEARLINE: 2,
StorageClass.COLDLINE: 1,
StorageClass.ARCHIVE: 0,
}
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class AccessCount:
count: float
num_days: int
def __post_init__(self):
if self.count < 0:
raise InvalidConfigError('Access count must be >= 0.')
if self.num_days < 0:
raise InvalidConfigError('Number of days in access count must be >= 0.')
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class PixelSpacingRange:
min: Optional[float] = None
max: Optional[float] = None
def __post_init__(self):
if not self.min and not self.max:
raise InvalidConfigError(
'At least one of (min, max) in pixel spacing range must be set.'
)
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class ToLowerAvailabilityCondition:
"""Condition for moving to a lower availability storage class."""
modality: Optional[str] = None
sop_class_uid: Optional[str] = None
# Min, max range inclusive, i.e.,
# range min <= instance pixel spacing <= range max
pixel_spacing_range: Optional[PixelSpacingRange] = None
# Date format: YYYYMMDD. See StorageClassConfig for relative date priority.
date_before: Optional[str] = None
size_bytes_larger_than: Optional[int] = None
access_count_lower_or_equal_to: Optional[AccessCount] = None
num_days_in_current_storage_class_higher_than: Optional[int] = None
# Set of concatenated image type array values to match with, e.g.
# {"DERIVED/PRIMARY/VOLUME/RESAMPLED", "ORIGINAL/PRIMARY/VOLUME"}
# Please note /NONE suffixes should be added as a separate value if
# applicable, i.e. for example
# image_type = {"ORIGINAL/PRIMARY/VOLUME"} in the ILM config will NOT
# automatically match with instances that have
# image type = [ORIGINAL, PRIMARY, VOLUME, NONE] and vice versa, so
# both {"ORIGINAL/PRIMARY/VOLUME", "ORIGINAL/PRIMARY/VOLUME/NONE"}
# should be added to the config if this is desirable.
image_type: Optional[Set[str]] = None
def __str__(self):
params = []
if self.modality:
params.append(f'modality={self.modality}')
if self.sop_class_uid:
params.append(f'sop_class_uid={self.sop_class_uid}')
if self.pixel_spacing_range:
params.append(f'pixel_spacing_range={self.pixel_spacing_range}')
if self.date_before:
params.append(f'date_before={self.date_before}')
if self.size_bytes_larger_than:
params.append(f'size_bytes_larger_than={self.size_bytes_larger_than}')
if self.access_count_lower_or_equal_to:
params.append(
f'access_count_lower_or_equal_to={self.access_count_lower_or_equal_to}'
)
if self.num_days_in_current_storage_class_higher_than:
params.append(
f'num_days_in_current_storage_class_higher_than={self.num_days_in_current_storage_class_higher_than}'
)
if self.image_type:
params.append(f'image_type={self.image_type}')
return 'ToLowerAvailabilityCondition(' + ', '.join(params) + ')'
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class ToHigherAvailabilityCondition:
"""Condition for moving to a higher availability storage class."""
modality: Optional[str] = None
sop_class_uid: Optional[str] = None
# Min, max range inclusive, i.e.,
# range min <= instance pixel spacing <= range max
pixel_spacing_range: Optional[PixelSpacingRange] = None
# Date format: YYYYMMDD. See StorageClassConfig for relative date priority.
date_after: Optional[str] = None
size_bytes_lower_than: Optional[int] = None
access_count_higher_or_equal_to: Optional[AccessCount] = None
num_days_in_current_storage_class_higher_than: Optional[int] = None
# Set of concatenated image type array values to match with, e.g.
# {"DERIVED/PRIMARY/VOLUME/RESAMPLED", "ORIGINAL/PRIMARY/VOLUME"}
# Please note /NONE suffixes should be added as a separate value if
# applicable, i.e. for example
# image_type = {"ORIGINAL/PRIMARY/VOLUME"} in the ILM config will NOT
# automatically match with instances that have
# image type = [ORIGINAL, PRIMARY, VOLUME, NONE] and vice versa, so
# both {"ORIGINAL/PRIMARY/VOLUME", "ORIGINAL/PRIMARY/VOLUME/NONE"}
# should be added to the config if this is desirable.
image_type: Optional[Set[str]] = None
def __str__(self):
params = []
if self.modality:
params.append(f'modality={self.modality}')
if self.sop_class_uid:
params.append(f'sop_class_uid={self.sop_class_uid}')
if self.pixel_spacing_range:
params.append(f'pixel_spacing_range={self.pixel_spacing_range}')
if self.date_after:
params.append(f'date_after={self.date_after}')
if self.size_bytes_lower_than:
params.append(f'size_bytes_lower_than={self.size_bytes_lower_than}')
if self.access_count_higher_or_equal_to:
params.append(
f'access_count_higher_or_equal_to={self.access_count_higher_or_equal_to}'
)
if self.num_days_in_current_storage_class_higher_than:
params.append(
f'num_days_in_current_storage_class_higher_than={self.num_days_in_current_storage_class_higher_than}'
)
if self.image_type:
params.append(f'image_type={self.image_type}')
return 'ToHigherAvailabilityCondition(' + ', '.join(params) + ')'
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class MoveRule:
"""Rule for moving an instance from one storage class to another.
If moving from a lower to a higher availability storage class,
upgrade_conditions must be populated. Conversely, if moving from a higher
to a lower availability storage class, downgrade_conditions must be
populated.
Instances are moved if ANY of the conditions in the downgrade/upgrade
list are met. For a condition to be satisfied, ALL criteria within it must
be satisfied.
E.g.: Given a move rule:
from_storage_class = STANDARD
to_storage_class = ARCHIVE
downgrade_conditions = [
(modality = 'SM'),
(modality = 'MR', access_count_lower_or_equal_to = (count = 0, days = 10))
(modality = 'MR', access_count_lower_or_equal_to = (count = 5, days = 30))
]
Instances will be moved if:
(modality = SM)
OR
(modality = MR AND they were not accessed in last 10 days)
OR
(modality = MR AND they were accessed 5 times or less in last 30 days)
"""
# Current storage class for DICOM instance.
from_storage_class: StorageClass
# New storage class for DICOM instance.
to_storage_class: StorageClass
# List of conditions to move instances between storage classes above.
upgrade_conditions: Optional[List[ToHigherAvailabilityCondition]] = None
downgrade_conditions: Optional[List[ToLowerAvailabilityCondition]] = None
def __post_init__(self):
"""Validates rule parameters."""
if self.from_storage_class == self.to_storage_class:
raise InvalidConfigError(
'Current storage class and new storage class must be different.'
)
if self.upgrade_conditions and self.downgrade_conditions:
raise InvalidConfigError(
'Only one of {upgrade_conditions, downgrade_conditions} can be set.'
)
from_availability = _STORAGE_CLASS_AVAILABILITY[self.from_storage_class]
to_availability = _STORAGE_CLASS_AVAILABILITY[self.to_storage_class]
if from_availability > to_availability and not self.downgrade_conditions:
raise InvalidConfigError(
'Moving to a lower availabilitiy storage class. Expected '
'downgrade_conditions to be set.'
)
elif from_availability < to_availability and not self.upgrade_conditions:
raise InvalidConfigError(
'Moving to a higher availabilitiy storage class. Expected '
'upgrade_conditions to be set.'
)
@property
def conditions(
self,
) -> List[ToHigherAvailabilityCondition | ToLowerAvailabilityCondition]:
if self.upgrade_conditions:
return self.upgrade_conditions
if self.downgrade_conditions:
return self.downgrade_conditions
return []
class DateTags(enum.Enum):
ACQUISITION_DATE = 'ACQUISITION_DATE'
CONTENT_DATE = 'CONTENT_DATE'
SERIES_DATE = 'SERIES_DATE'
STUDY_DATE = 'STUDY_DATE'
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class StorageClassConfig:
"""Heuristic config for moving instances between storage classes.
Rules are applied in order, i.e. rule[i] has a higher priority than rule[i+1]
when both apply to the same origin storage class.
E.g. Given a storage class config
move_rules = (
(from_storage_class: STANDARD, to_storage_class: ARCHIVE, conditions0),
(from_storage_class: ARCHIVE, to_storage_class: COLDLINE, conditions1),
(from_storage_class: STANDARD, to_storage_class: NEARLINE, conditions2))
)
And an instance in STANDARD storage, if both conditions0 and conditions2 are
met, the instance is moved to ARCHIVE since rule[0] has a higher priority
than rule[2].
"""
move_rules: List[MoveRule]
# Relative priority for date tags in case some dates are not present,
# from highest to lowest.
# E.g. if date_priority = ['STUDY_DATE', 'ACQUISITION_DATE', 'SERIES_DATE'],
# the study date will be used. Acquisition date will be used if study date is
# not defined, and so on.
date_priority: List[DateTags] = dataclasses.field(
default_factory=lambda: [
DateTags.CONTENT_DATE,
DateTags.SERIES_DATE,
DateTags.STUDY_DATE,
DateTags.ACQUISITION_DATE,
]
)
def __post_init__(self):
if not self.move_rules:
raise InvalidConfigError('move_rules in StorageClassConfig must be set.')
if not self.date_priority:
raise InvalidConfigError(
'date_priority in StorageClassConfig must be defined.'
)
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class DicomStoreConfig:
"""DICOM store configuration."""
# DICOM Store to perform storage class changes on.
# Expects full resource name format, i.e.
# projects/<project>/locations/<location>/datasets/<dataset>/dicomStores/<ds>
dicom_store_path: str
# BigQuery table streaming from DICOM Store above
# Format: <project id>.<dataset id>.<table id>
dicom_store_bigquery_table: str
# Maximum number of instances to batch into a single
# SetBlobStorageSettingsRequest using the filter config.
set_storage_class_max_num_instances: int = 10_000
# Timeout in minutes for SetBlobStorageSettingsRequest operations.
set_storage_class_timeout_min: int = 60
# Whether to delete SetBlobStorageSettingsRequest filter files written to GCS
# after timeout defined above.
set_storage_class_delete_filter_files: bool = True
# Maximum QPS for DICOM store requests for each beam worker when sending
# SetBlobStorageSettings requests.
# Consider adjusting in accordance with beam pipeline WorkerOptions.
max_dicom_store_qps: float = 2.0
def __post_init__(self):
if not self.dicom_store_path:
raise InvalidConfigError('dicom_store_path must be set.')
if not self.dicom_store_bigquery_table:
raise InvalidConfigError('dicom_store_bigquery_table must be set.')
if self.set_storage_class_max_num_instances <= 0:
raise InvalidConfigError(
'set_storage_class_max_num_instances must be positive.'
)
if self.set_storage_class_timeout_min <= 0:
raise InvalidConfigError(
'set_storage_class_timeout_min must be positive.'
)
if self.max_dicom_store_qps <= 0:
raise InvalidConfigError('max_dicom_store_qps must be positive.')
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class DataAccessLogsConfiguration:
"""Data Access audit logs configuration."""
# Bigquery table for Data Access audit logs sink.
# Format: <project id>.<dataset id>.<table id>
# May also include wildcard for multiple tables,
# e.g. <project id>.<dataset id>.<table prefix>*
logs_bigquery_table: str
# Ignore access log entries before this date. Date format: YYYYMMDD.
log_entries_date_equal_or_after: Optional[str] = None
# Maximum QPS for DICOM store requests for each beam worker when parsing logs.
# Consider adjusting in accordance with beam pipeline WorkerOptions.
# DICOM Store is queried to fetch all instances accessed in the case of
# RetrieveSeries or RetrieveStudy requests.
max_dicom_store_qps: float = 2.0
def __post_init__(self):
if not self.logs_bigquery_table:
raise InvalidConfigError('logs_bigquery_table must be set.')
if self.max_dicom_store_qps <= 0:
raise InvalidConfigError('max_dicom_store_qps must be positive.')
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class ReportConfiguration:
"""Configuration for generating storage class updates report."""
# GCS file to write summarized report of storage class updates to. Includes
# counters of number of instances updated per move rule condition.
summarized_results_report_gcs_uri: str
# GCS file to write detailed report of storage class updates to. Includes
# the actual instances updated per move rule condition.
detailed_results_report_gcs_uri: Optional[str] = None
def __post_init__(self):
if not self.summarized_results_report_gcs_uri:
raise InvalidConfigError('summarized_results_report_gcs_uri must be set.')
if '{}' not in self.summarized_results_report_gcs_uri:
raise InvalidConfigError(
'summarized_results_report_gcs_uri must include "{}" placeholder in '
'filename for timestamp, e.g. '
'"gs://<your-bucket>/<some-dir>/summarized_report_{}.csv"'
)
if (
self.detailed_results_report_gcs_uri
and '{}' not in self.detailed_results_report_gcs_uri
):
raise InvalidConfigError(
'detailed_results_report_gcs_uri must include "{}" placeholder in '
'filename for timestamp, e.g. '
'"gs://<your-bucket>/<some-dir>/detailed_report_{}.csv"'
)
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class ImageLifecycleManagementConfig:
"""Image Lifecycle Management configuration."""
# Whether to enable dry-run mode. If true, DICOM store changes are skipped.
dry_run: bool
# DICOM store config, including bigquery table and request options.
dicom_store_config: DicomStoreConfig
# Data Access audit logs configuration.
logs_config: DataAccessLogsConfiguration
# List of DICOM instances to exclude from ILM processing.
# Instances should be in the format
# studies/<study UID>/series/<series UID>/instances/<SOP instance UID>,
# e.g.
# instances_disallow_list = ["studies/1.2/series/3.4/instances/5.6", (...)]
instances_disallow_list: Set[str]
# Rules based configuration for changing instances' storage classes.
storage_class_config: StorageClassConfig
# GCS URI to write temporary results to.
tmp_gcs_uri: str
# Configuration for report of storage class updates, which is generated at
# the end of the pipeline execution.
report_config: ReportConfiguration
def __post_init__(self):
if not self.tmp_gcs_uri:
raise InvalidConfigError('tmp_gcs_uri must be set.')