ilm/ilm_lib/heuristics.py (204 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ==============================================================================
"""ILM heuristics to determine storage class changes based on metadata."""
import collections
import datetime
import logging
from typing import Any, Iterable, List, Optional, Set
import apache_beam as beam
from ilm import ilm_config
from ilm import ilm_types
def _convert_to_date(date_str: Optional[str]) -> Optional[datetime.date]:
if not date_str:
return None
return datetime.datetime.strptime(date_str, '%Y%m%d').date()
def _optional_equal(
metadata_value: Optional[Any], condition_value: Optional[Any]
) -> bool:
if condition_value is None:
return True
if metadata_value is None:
return False
return metadata_value == condition_value
def _optional_in(
metadata_value: Optional[Any], condition_value: Optional[Set[Any]]
) -> bool:
if condition_value is None:
return True
if metadata_value is None:
return False
return metadata_value in condition_value
def _optional_larger(
metadata_value: Optional[Any], condition_value: Optional[Any]
) -> bool:
if condition_value is None:
return True
if metadata_value is None:
return False
return metadata_value > condition_value
def _optional_larger_or_equal(
metadata_value: Optional[Any], condition_value: Optional[Any]
) -> bool:
if condition_value is None:
return True
if metadata_value is None:
return False
return metadata_value >= condition_value
def _optional_smaller(
metadata_value: Optional[Any], condition_value: Optional[Any]
) -> bool:
if condition_value is None:
return True
if metadata_value is None:
return False
return metadata_value < condition_value
def _optional_smaller_or_equal(
metadata_value: Optional[Any], condition_value: Optional[Any]
) -> bool:
if condition_value is None:
return True
if metadata_value is None:
return False
return metadata_value <= condition_value
def _is_in_range(
pixel_spacing: float,
pixel_spacing_range: Optional[ilm_config.PixelSpacingRange],
) -> bool:
"""Whether pixel spacing is in expected range (min, max inclusive)."""
if pixel_spacing_range is None:
return True
if pixel_spacing is None:
return False
if pixel_spacing_range.min and pixel_spacing_range.min > pixel_spacing:
return False
if pixel_spacing_range.max and pixel_spacing > pixel_spacing_range.max:
return False
return True
def _should_upgrade(
metadata: ilm_types.InstanceMetadata,
metadata_date: Optional[datetime.date],
upgrade_conditions: Optional[
List[ilm_config.ToHigherAvailabilityCondition]
] = None,
) -> Optional[int]:
"""Whether instance should be upgraded based on metadata and conditions."""
if not metadata.access_metadata:
raise ValueError(f'Instance metadata missing access info: {metadata}')
for condition_index, condition in enumerate(upgrade_conditions):
condition_date_after = _convert_to_date(condition.date_after)
if condition.access_count_higher_or_equal_to:
condition_access_count_higher_or_equal_to = (
condition.access_count_higher_or_equal_to.count
)
metadata_access_count = metadata.access_metadata.get_access_count(
condition.access_count_higher_or_equal_to.num_days
)
else:
metadata_access_count = None
condition_access_count_higher_or_equal_to = None
if all([
_optional_equal(metadata.modality, condition.modality),
_optional_equal(metadata.sop_class_uid, condition.sop_class_uid),
_optional_in(metadata.image_type, condition.image_type),
_is_in_range(metadata.pixel_spacing, condition.pixel_spacing_range),
_optional_larger(metadata_date, condition_date_after),
_optional_smaller(metadata.size_bytes, condition.size_bytes_lower_than),
_optional_larger_or_equal(
metadata_access_count, condition_access_count_higher_or_equal_to
),
_optional_larger(
metadata.num_days_in_current_storage_class,
condition.num_days_in_current_storage_class_higher_than,
),
]):
# All of criteria within single condition are satisfied
return condition_index
# None of the conditions are satisfied
return None
def _should_downgrade(
metadata: ilm_types.InstanceMetadata,
metadata_date: Optional[datetime.date],
downgrade_conditions: Optional[
List[ilm_config.ToLowerAvailabilityCondition]
] = None,
) -> Optional[int]:
"""Whether instance should be downgraded based on metadata and conditions."""
if not metadata.access_metadata:
raise ValueError(f'Instance metadata missing access info: {metadata}')
for condition_index, condition in enumerate(downgrade_conditions):
condition_date_before = _convert_to_date(condition.date_before)
if condition.access_count_lower_or_equal_to:
condition_access_count_lower_or_equal_to = (
condition.access_count_lower_or_equal_to.count
)
metadata_access_count = metadata.access_metadata.get_access_count(
condition.access_count_lower_or_equal_to.num_days
)
else:
metadata_access_count = None
condition_access_count_lower_or_equal_to = None
if all([
_optional_equal(metadata.modality, condition.modality),
_optional_equal(metadata.sop_class_uid, condition.sop_class_uid),
_optional_in(metadata.image_type, condition.image_type),
_is_in_range(metadata.pixel_spacing, condition.pixel_spacing_range),
_optional_smaller(metadata_date, condition_date_before),
_optional_larger(metadata.size_bytes, condition.size_bytes_larger_than),
_optional_smaller_or_equal(
metadata_access_count, condition_access_count_lower_or_equal_to
),
_optional_larger(
metadata.num_days_in_current_storage_class,
condition.num_days_in_current_storage_class_higher_than,
),
]):
# All of criteria within single condition are satisfied
return condition_index
# None of the conditions are satisfied
return None
def _get_highest_priority_date(
metadata: ilm_types.InstanceMetadata,
dates_priority: List[ilm_config.DateTags],
) -> Optional[datetime.date]:
"""Returns highest priority date in metadata."""
date_tag_to_value = {
ilm_config.DateTags.ACQUISITION_DATE: metadata.acquisition_date,
ilm_config.DateTags.CONTENT_DATE: metadata.content_date,
ilm_config.DateTags.SERIES_DATE: metadata.series_date,
ilm_config.DateTags.STUDY_DATE: metadata.study_date,
}
for date_tag in dates_priority:
if date_tag_to_value[date_tag] is not None:
return date_tag_to_value[date_tag]
return None
class ComputeStorageClassChangesDoFn(beam.DoFn):
"""DoFn to determine instances' storage class changes based on move rules."""
def __init__(self, ilm_cfg: ilm_config.ImageLifecycleManagementConfig):
self._from_storage_class_to_rules = collections.defaultdict(list)
for rule_index, rule in enumerate(ilm_cfg.storage_class_config.move_rules):
self._from_storage_class_to_rules[rule.from_storage_class].append(
(rule_index, rule)
)
self._dates_priority = ilm_cfg.storage_class_config.date_priority
def process(
self, metadata: ilm_types.InstanceMetadata
) -> Iterable[ilm_types.StorageClassChange]:
"""Applies config move rules and returns metadata with new storage class."""
move_rules = self._from_storage_class_to_rules[metadata.storage_class]
metadata_date = _get_highest_priority_date(metadata, self._dates_priority)
detected_move_rules = []
storage_class_change = None
for rule_index, rule in move_rules:
if rule.upgrade_conditions:
condition_index = _should_upgrade(
metadata, metadata_date, rule.upgrade_conditions
)
else:
condition_index = _should_downgrade(
metadata, metadata_date, rule.downgrade_conditions
)
if condition_index is not None:
detected_move_rules.append(rule_index)
if storage_class_change is None:
storage_class_change = ilm_types.StorageClassChange(
instance=metadata.instance,
new_storage_class=rule.to_storage_class,
move_rule_id=ilm_types.MoveRuleId(
rule_index=rule_index, condition_index=condition_index
),
)
if len(detected_move_rules) > 1:
logging.warning(
'Multiple move rules detected for instance %s: %s',
metadata.instance,
detected_move_rules,
)
if storage_class_change:
yield storage_class_change