ilm/ilm_lib/dicom_store_lib.py (581 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # ============================================================================== """DICOM store utilities for ILM.""" import collections import dataclasses import datetime import json import logging import os import time from typing import Any, Iterable, List, Mapping, Optional, Tuple import urllib.parse import uuid import apache_beam as beam import google.auth import google.auth.transport.requests from google.cloud import bigquery from google.cloud import storage import requests from ilm import ilm_config from ilm import ilm_types from ilm.ilm_lib import pipeline_util # Required DICOM Store BigQuery table columns. _BLOB_STORAGE_SIZE = 'BlobStorageSize' _LAST_UPDATED = 'LastUpdated' _LAST_UPDATED_STORAGE_CLASS = 'LastUpdatedStorageClass' _SERIES_INSTANCE_UID = 'SeriesInstanceUID' _SOP_INSTANCE_UID = 'SOPInstanceUID' _STORAGE_CLASS = 'StorageClass' _STUDY_INSTANCE_UID = 'StudyInstanceUID' _TYPE = 'Type' # Optional DICOM Store BigQuery table columns. _ACQUISITION_DATE = 'AcquisitionDate' _CONTENT_DATE = 'ContentDate' _IMAGE_TYPE = 'ImageType' _MODALITY = 'Modality' _NUMBER_OF_FRAMES = 'NumberOfFrames' _PIXEL_SPACING = 'PixelSpacing' _SERIES_DATE = 'SeriesDate' _SOP_CLASS_UID = 'SOPClassUID' _STUDY_DATE = 'StudyDate' _OPTIONAL_DICOM_STORE_METADATA_COLUMNS = frozenset([ _ACQUISITION_DATE, _CONTENT_DATE, _IMAGE_TYPE, _MODALITY, _NUMBER_OF_FRAMES, _PIXEL_SPACING, _SERIES_DATE, _SOP_CLASS_UID, _STUDY_DATE, ]) # BigQuery query keywords _INSTANCE_DICOM_WEB_PATH = 'InstanceDicomWebPath' class _DicomTags: SERIES_INSTANCE_UID = '0020000E' SOP_INSTANCE_UID = '00080018' @dataclasses.dataclass(frozen=True) class _StorageClassResult: succeeded_count: int failed_count: int unfinished_count: int succeeded_instances: List[str] failed_instances: List[str] unfinished_instances: List[str] class DicomStoreError(Exception): pass def _get_optional_dicom_store_metadata_table_columns( ilm_cfg: ilm_config.ImageLifecycleManagementConfig, ) -> List[str]: """Fetches metadata columns present in DICOM Store BigQuery table.""" project_id, dataset_id, table_id = ( ilm_cfg.dicom_store_config.dicom_store_bigquery_table.split('.') ) client = bigquery.Client(project=project_id) table_ref = client.dataset(dataset_id).table(table_id) table = client.get_table(table_ref) metadata_columns = [] for field in table.schema: if field.name in _OPTIONAL_DICOM_STORE_METADATA_COLUMNS: metadata_columns.append(field.name) return metadata_columns def _metadata_columns_select(metadata_columns: List[str]) -> str: return ( ', '.join(metadata_columns) .replace( _PIXEL_SPACING, f'{_PIXEL_SPACING}[SAFE_OFFSET(0)] AS {_PIXEL_SPACING}', ) .replace( _IMAGE_TYPE, f'ARRAY_TO_STRING({_IMAGE_TYPE}, "/") AS {_IMAGE_TYPE}' ) ) def _metadata_columns_group_by(metadata_columns: List[str]) -> str: return ( ', '.join(metadata_columns) .replace(_PIXEL_SPACING, f'{_PIXEL_SPACING}[SAFE_OFFSET(0)]') .replace(_IMAGE_TYPE, f'ARRAY_TO_STRING({_IMAGE_TYPE}, "/")') ) def get_dicom_store_query( ilm_cfg: ilm_config.ImageLifecycleManagementConfig, ): """Generates query to fetch DICOM metadata from BigQuery table.""" metadata_columns = _get_optional_dicom_store_metadata_table_columns(ilm_cfg) metadata_columns.extend([_BLOB_STORAGE_SIZE, _STORAGE_CLASS, _TYPE]) metadata_columns_select = _metadata_columns_select(metadata_columns) metadata_columns_group_by = _metadata_columns_group_by(metadata_columns) partition_by_uid_triple = ( f'PARTITION BY {_STUDY_INSTANCE_UID}, {_SERIES_INSTANCE_UID}, ' f'{_SOP_INSTANCE_UID}' ) return ( 'SELECT ' ' CONCAT( ' f' "studies/", {_STUDY_INSTANCE_UID}, ' f' "/series/", {_SERIES_INSTANCE_UID}, ' f' "/instances/", {_SOP_INSTANCE_UID}) AS {_INSTANCE_DICOM_WEB_PATH}, ' f' {metadata_columns_select}, ' f' MAX({_LAST_UPDATED}) AS {_LAST_UPDATED}, ' f' MAX({_LAST_UPDATED_STORAGE_CLASS}) AS {_LAST_UPDATED_STORAGE_CLASS} ' 'FROM ( ' ' SELECT ' ' *, ' f' MAX({_LAST_UPDATED}) ' f' OVER ({partition_by_uid_triple}) AS MaxLastUpdated, ' ' CASE ' f' WHEN {_TYPE}="CREATE" OR {_STORAGE_CLASS}!=LAG({_STORAGE_CLASS}) ' f' OVER ({partition_by_uid_triple} ORDER BY {_LAST_UPDATED} ASC) ' f' THEN {_LAST_UPDATED} ' ' ELSE NULL ' f' END AS {_LAST_UPDATED_STORAGE_CLASS} ' f' FROM `{ilm_cfg.dicom_store_config.dicom_store_bigquery_table}` ' ') ' 'WHERE ' f' {_LAST_UPDATED} = MaxLastUpdated ' f' AND {_STORAGE_CLASS} IS NOT NULL ' f' AND {_TYPE}!="DELETE" ' f'GROUP BY {_STUDY_INSTANCE_UID}, {_SERIES_INSTANCE_UID}, ' f' {_SOP_INSTANCE_UID}, {metadata_columns_group_by}' ) def parse_dicom_metadata( raw_metadata: Mapping[str, Any], today: datetime.datetime = datetime.datetime.now(tz=datetime.timezone.utc), ) -> Tuple[str, ilm_types.InstanceMetadata]: """Parses metadata from DICOM store table query result.""" try: instance = raw_metadata[_INSTANCE_DICOM_WEB_PATH] pixel_spacing = ( float(raw_metadata[_PIXEL_SPACING]) if raw_metadata.get(_PIXEL_SPACING) else None ) last_updated_storage_class = raw_metadata[_LAST_UPDATED_STORAGE_CLASS] if last_updated_storage_class is None: logging.info( 'Missing last updated for storage class in DICOM Store BigQuery ' 'table for instance %s. Using last updated timestamp instead.', instance, ) last_updated_storage_class = raw_metadata[_LAST_UPDATED] days_in_current_storage_class = (today - last_updated_storage_class).days metadata = ilm_types.InstanceMetadata( instance=instance, modality=raw_metadata.get(_MODALITY, None), num_frames=int(raw_metadata.get(_NUMBER_OF_FRAMES) or 0), pixel_spacing=pixel_spacing, sop_class_uid=raw_metadata.get(_SOP_CLASS_UID, ''), acquisition_date=raw_metadata.get(_ACQUISITION_DATE, None), content_date=raw_metadata.get(_CONTENT_DATE, None), series_date=raw_metadata.get(_SERIES_DATE, None), study_date=raw_metadata.get(_STUDY_DATE, None), image_type=raw_metadata.get(_IMAGE_TYPE, None), size_bytes=raw_metadata[_BLOB_STORAGE_SIZE], storage_class=ilm_config.StorageClass(raw_metadata[_STORAGE_CLASS]), num_days_in_current_storage_class=days_in_current_storage_class, ) except (KeyError, ValueError, TypeError) as e: raise RuntimeError( f'Invalid metadata in DICOM Store BigQuery table: {raw_metadata}' ) from e return (instance, metadata) class GenerateFilterFilesDoFn(beam.DoFn): """DoFn to generate filter files for DICOM instances storage class changes.""" def __init__(self, ilm_cfg: ilm_config.ImageLifecycleManagementConfig): self._ilm_cfg = ilm_cfg self._dry_run = ilm_cfg.dry_run self._storage_client = None def setup(self): self._storage_client = storage.Client() def process( self, storage_class_to_changes: Tuple[ ilm_config.StorageClass, Iterable[ilm_types.StorageClassChange] ], ) -> Iterable[ilm_types.SetStorageClassRequestMetadata]: new_storage_class, changes = storage_class_to_changes filter_file_gcs_uri = os.path.join( self._ilm_cfg.tmp_gcs_uri, f'instances-filter-{uuid.uuid4()}.txt' ) changes = list(changes) if not self._dry_run: instances_str = '\n'.join([change.instance for change in changes]) pipeline_util.write_gcs_file( file_content=instances_str, gcs_uri=filter_file_gcs_uri, storage_client=self._storage_client, ) if self._ilm_cfg.report_config.detailed_results_report_gcs_uri: instances = [change.instance for change in changes] else: instances = [] yield ilm_types.SetStorageClassRequestMetadata( [change.move_rule_id for change in changes], instances, filter_file_gcs_uri, new_storage_class, ) class DicomStoreClient: """DICOM store client for storage class operations.""" _HEALTHCARE_API = 'https://healthcare.googleapis.com/v1beta1/' _OPERATION_NAME_KEY = 'name' _OPERATION_STATUS_KEY = 'done' def __init__(self, ilm_cfg: ilm_config.ImageLifecycleManagementConfig): self._dicom_store_resource = ilm_cfg.dicom_store_config.dicom_store_path self._dicom_store_path = urllib.parse.urljoin( DicomStoreClient._HEALTHCARE_API, self._dicom_store_resource.lstrip('/') ).rstrip('/') self._auth_credentials = None def _add_auth_to_header( self, msg_headers: Mapping[str, str] ) -> Mapping[str, str]: """Updates credentials returns header with authentication added.""" if self._auth_credentials is None: self._auth_credentials = google.auth.default( scopes=['https://www.googleapis.com/auth/cloud-platform'] )[0] if not self._auth_credentials.valid: auth_req = google.auth.transport.requests.Request() self._auth_credentials.refresh(auth_req) self._auth_credentials.apply(msg_headers) return msg_headers def _get_tag_value( self, instance_metadata: Mapping[str, Any], tag: str ) -> Optional[Any]: try: return instance_metadata[tag]['Value'][0] except (KeyError, IndexError): return None def fetch_instances(self, dicomweb_path: str) -> List[str]: """Fetches instances for given DICOMweb path. Args: dicomweb_path: path to either a study (i.e. studies/<STUDY>) or a series (i.e. studies/<STUDY>/series/<SERIES>) in DICOM store. Returns: List of full DICOMweb paths of instances. Raises: DicomStoreError: in case of unexpected or error response from DICOM Store. """ uri = os.path.join( self._dicom_store_path, 'dicomWeb', dicomweb_path, 'instances' ) headers = self._add_auth_to_header({}) try: response = requests.get(uri, headers=headers) response.raise_for_status() response_json = response.json() except (requests.HTTPError, requests.exceptions.JSONDecodeError) as e: raise DicomStoreError( f'Failed to get instances metadata for {dicomweb_path}.' ) from e has_series = 'series/' in dicomweb_path instances = [] for instance_metadata in response_json: instance_dicomweb_path = [dicomweb_path] if not has_series: series_instance_uid = self._get_tag_value( instance_metadata, _DicomTags.SERIES_INSTANCE_UID ) instance_dicomweb_path.append(f'series/{series_instance_uid}') sop_instance_uid = self._get_tag_value( instance_metadata, _DicomTags.SOP_INSTANCE_UID ) instance_dicomweb_path.append(f'instances/{sop_instance_uid}') instance_dicomweb_path = '/'.join(instance_dicomweb_path) instances.append(instance_dicomweb_path) return instances def set_blob_storage_settings( self, request: ilm_types.SetStorageClassRequestMetadata, ) -> str: """Sends setBlobStorageSettings request with filter file. Args: request: metadata for sending setBlobStorageSettings request. Returns: setBlobStorageSettings response operation name. Raises: DicomStoreError if request fails. """ uri = f'{self._dicom_store_path}:setBlobStorageSettings' headers = self._add_auth_to_header( {'Content-Type': 'application/json; charset=utf-8'} ) data = ( '{{ ' ' resource: "{dicom_store_resource}",' ' filter_config: {{ resourcePathsGcsUri: "{filter_file_gcs_uri}" }},' ' blobStorageSettings: {{ blob_storage_class: "{new_storage_class}" }}' '}}'.format( dicom_store_resource=self._dicom_store_resource, filter_file_gcs_uri=request.filter_file_gcs_uri, new_storage_class=request.new_storage_class.value, ) ) try: response = requests.post(uri, data=data, headers=headers) response.raise_for_status() operation = json.loads(response.content) return operation[DicomStoreClient._OPERATION_NAME_KEY] except (requests.HTTPError, json.JSONDecodeError) as exp: raise DicomStoreError( f'Failed to change storage class for {len(request.move_rule_ids)} ' f'instance(s) with error: {exp}' ) from exp def is_operation_done(self, operation_name: str) -> bool: """Get operation status. Args: operation_name: Operation resource name. Returns: True if operation is done. Raises: DicomStoreError if unable to get operation status. """ uri = os.path.join(DicomStoreClient._HEALTHCARE_API, operation_name) headers = self._add_auth_to_header({}) try: response = requests.get(uri, headers=headers) response.raise_for_status() operation = json.loads(response.content) return operation[DicomStoreClient._OPERATION_NAME_KEY] except (requests.HTTPError, json.JSONDecodeError) as exp: raise DicomStoreError( f'Failed to get operation status for {operation_name}' ) from exp except KeyError: return False class UpdateStorageClassesDoFn(beam.DoFn): """DoFn to update storage classes of instances in DICOM store.""" def __init__(self, ilm_cfg: ilm_config.ImageLifecycleManagementConfig): self._dry_run = ilm_cfg.dry_run self._cfg = ilm_cfg self._dicom_store_client = DicomStoreClient(self._cfg) self._throttler = pipeline_util.Throttler( self._cfg.dicom_store_config.max_dicom_store_qps ) def setup(self): self._dicom_store_client = DicomStoreClient(self._cfg) self._throttler = pipeline_util.Throttler( self._cfg.dicom_store_config.max_dicom_store_qps ) def process( self, request: ilm_types.SetStorageClassRequestMetadata ) -> Iterable[ilm_types.SetStorageClassOperationMetadata]: if self._dry_run: logging.info('Dry-run mode. Skipping DICOM store changes.') return [ ilm_types.SetStorageClassOperationMetadata( operation_name='', move_rule_ids=request.move_rule_ids, instances=request.instances, succeeded=True, ) ] try: self._throttler.wait() operation_name = self._dicom_store_client.set_blob_storage_settings( request ) return [ ilm_types.SetStorageClassOperationMetadata( operation_name=operation_name, move_rule_ids=request.move_rule_ids, instances=request.instances, start_time=time.time(), filter_file_gcs_uri=request.filter_file_gcs_uri, ) ] except DicomStoreError as e: logging.error(e) return [ ilm_types.SetStorageClassOperationMetadata( operation_name='', move_rule_ids=request.move_rule_ids, instances=request.instances, succeeded=False, filter_file_gcs_uri=request.filter_file_gcs_uri, ) ] class GenerateReportDoFn(beam.DoFn): """DoFn to generate report of storage class updates after LROs finish.""" def __init__(self, ilm_cfg: ilm_config.ImageLifecycleManagementConfig): self._dry_run = ilm_cfg.dry_run self._move_rules = ilm_cfg.storage_class_config.move_rules self._operation_timeout_seconds = ( ilm_cfg.dicom_store_config.set_storage_class_timeout_min * 60 ) self._cfg = ilm_cfg def setup(self): self._operations_in_progress = collections.deque() self._operations_done = [] self._storage_class_results = {} timestamp = str(datetime.datetime.now()) self._summarized_report = ( self._cfg.report_config.summarized_results_report_gcs_uri.format( timestamp ) ) if self._cfg.report_config.detailed_results_report_gcs_uri: self._detailed_report = ( self._cfg.report_config.detailed_results_report_gcs_uri.format( timestamp ) ) else: self._detailed_report = '' self._dicom_store_client = DicomStoreClient(self._cfg) self._throttler = pipeline_util.Throttler( target_qps=self._cfg.dicom_store_config.max_dicom_store_qps ) def _wait_for_operations_to_finish(self): """Waits for DICOM store operations to finish. Operations that time out have succeeded status = None. """ logging.info('Waiting for SetBlobStorageSettings operations to finish.') if not self._operations_in_progress: return num_operations_to_verify = len(self._operations_in_progress) success = 0 failure = len(self._operations_done) # Operations that had already failed. timeout = 0 while self._operations_in_progress: if num_operations_to_verify <= 0: # Wait to verify operations again after a full round. time.sleep(60) num_operations_to_verify = len(self._operations_in_progress) operation = self._operations_in_progress.popleft() try: self._throttler.wait() if self._dicom_store_client.is_operation_done(operation.operation_name): operation = dataclasses.replace(operation, succeeded=True) self._operations_done.append(operation) success += 1 num_operations_to_verify -= 1 continue except DicomStoreError as e: logging.error(e) operation = dataclasses.replace(operation, succeeded=False) self._operations_done.append(operation) failure += 1 num_operations_to_verify -= 1 continue if time.time() > operation.start_time + self._operation_timeout_seconds: self._operations_done.append(operation) timeout += 1 num_operations_to_verify -= 1 continue self._operations_in_progress.append(operation) num_operations_to_verify -= 1 logging.info( 'Finished waiting for SetBlobStorageSettings operations: ' '%s succeeded, %s failed, %s timed out.', success, failure, timeout, ) def _cleanup_filter_files(self): if not self._cfg.dicom_store_config.set_storage_class_delete_filter_files: return for operation in self._operations_in_progress: if operation.filter_file_gcs_uri: pipeline_util.delete_gcs_file(operation.filter_file_gcs_uri) for operation in self._operations_done: if operation.filter_file_gcs_uri: pipeline_util.delete_gcs_file(operation.filter_file_gcs_uri) def _compute_results(self): """Computes instances results based on status of DICOM store operations.""" # Rule id to counts succeeded = collections.defaultdict(int) failed = collections.defaultdict(int) unfinished = collections.defaultdict(int) succeeded_instances = collections.defaultdict(list) failed_instances = collections.defaultdict(list) unfinished_instances = collections.defaultdict(list) rule_ids = set() for operation in self._operations_done: if operation.succeeded is None: counter = unfinished instances = unfinished_instances elif operation.succeeded: counter = succeeded instances = succeeded_instances else: counter = failed instances = failed_instances for rule_id in operation.move_rule_ids: counter[rule_id] += 1 rule_ids.add(rule_id) if operation.instances and len(operation.instances) == len( operation.move_rule_ids ): for rule_id, instance in zip( operation.move_rule_ids, operation.instances ): instances[rule_id].append(instance) for rule_id in rule_ids: self._storage_class_results[rule_id] = _StorageClassResult( succeeded_count=succeeded[rule_id], failed_count=failed[rule_id], unfinished_count=unfinished[rule_id], succeeded_instances=succeeded_instances[rule_id], failed_instances=failed_instances[rule_id], unfinished_instances=unfinished_instances[rule_id], ) def _generate_report(self): """Generates report of storage class updates. Reports instances' updates and status (success, failure, still running) for each move rule and condition in ILM config. """ self._compute_results() csv_separator = ',' header = csv_separator.join([ 'from_storage_class', 'to_storage_class', 'condition', 'rule_index', 'condition_index', 'succeeded', 'failed', 'unfinished', ]) summarized_report_results = [header] detailed_report_results = [header] logging.info('Generating storage class updates report(s).') # TODO: Also add operation name. for rule_index, rule in enumerate(self._move_rules): from_storage_class = rule.from_storage_class to_storage_class = rule.to_storage_class for condition_index, condition in enumerate(rule.conditions): rule_id = ilm_types.MoveRuleId( rule_index=rule_index, condition_index=condition_index ) if rule_id not in self._storage_class_results: continue condition_results = self._storage_class_results[rule_id] logging.info( 'Instances moved from %s to %s due to %s (move rule %s, condition ' '%s): %s succeeded, %s failed, %s unfinished', from_storage_class.value, to_storage_class.value, condition, rule_index, condition_index, condition_results.succeeded_count, condition_results.failed_count, condition_results.unfinished_count, ) summarized_report_results.append( csv_separator.join([ str(i) for i in [ from_storage_class.value, to_storage_class.value, condition, rule_index, condition_index, condition_results.succeeded_count, condition_results.failed_count, condition_results.unfinished_count, ] ]), ) if self._detailed_report: detailed_report_results.append( csv_separator.join([ str(i) for i in [ from_storage_class.value, to_storage_class.value, condition, rule_index, condition_index, f'"{condition_results.succeeded_instances}"', f'"{condition_results.failed_instances}"', f'"{condition_results.unfinished_instances}"', ] ]) ) summarized_report_results_str = '\n'.join(summarized_report_results) pipeline_util.write_gcs_file( file_content=summarized_report_results_str, gcs_uri=self._summarized_report, ) logging.info( 'Wrote summarized report of updates to %s.', self._summarized_report ) if self._detailed_report: detailed_report_results_str = '\n'.join(detailed_report_results) pipeline_util.write_gcs_file( file_content=detailed_report_results_str, gcs_uri=self._detailed_report, ) logging.info( 'Wrote detailed report of updates to %s.', self._detailed_report ) def process( self, key_to_operations: Tuple[ int, Iterable[ilm_types.SetStorageClassOperationMetadata] ], ) -> None: _, operations = key_to_operations if self._dry_run: self._operations_done = collections.deque(operations) else: for op in operations: if op.succeeded is None: self._operations_in_progress.append(op) else: self._operations_done.append(op) self._wait_for_operations_to_finish() self._cleanup_filter_files() self._generate_report()