ez_wsi_dicomweb/ml_toolkit/dicom_path.py (373 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Utilities for DICOMweb path manipulation.""" from __future__ import annotations import dataclasses import enum import re from typing import Any, Match, Optional import urllib.parse import dataclasses_json class Type(enum.Enum): """Type of a resource the path points to.""" STORE = 'store' STUDY = 'study' SERIES = 'series' INSTANCE = 'instance' # Used for project ID and location validation _REGEX_ID_1_TXT = r'[\w-]+' # Used for dataset ID and dicom store ID validation _REGEX_ID_2_TXT = r'[\w.-]+' # Used for DICOM UIDs validation # '/' is not allowed because the parsing logic in the class uses '/' to # tokenize the path. # '@' is not allowed due to security concerns: theoretically it could lead # to the part before '@' being interpreted as the username, and the part # after - as the server address, which is a potential vulnerability. _REGEX_UID_TXT = r'[^/@]+' _REGEX_BASE_ADDRESS = re.compile(r'https?://.+') _REGEX_ID_1 = re.compile(_REGEX_ID_1_TXT) _REGEX_ID_2 = re.compile(_REGEX_ID_2_TXT) _REGEX_UID = re.compile(_REGEX_UID_TXT) _REGEX_STORE = re.compile( r'projects/(%s)/locations/(%s)/datasets/(%s)/dicomStores/(%s)' r'(.*)' % (_REGEX_ID_1_TXT, _REGEX_ID_1_TXT, _REGEX_ID_2_TXT, _REGEX_ID_2_TXT) ) _REGEX_STUDIES = re.compile(r'((.+)/)?studies/(%s)(.*)' % _REGEX_UID_TXT) _REGEX_SERIES = re.compile(r'series/(%s)(.*)' % _REGEX_UID_TXT) _REGEX_INSTANCE = re.compile(r'instances/(%s)/?$' % _REGEX_UID_TXT) _HEALTHCARE_API_URL = 'https://healthcare.googleapis.com' _DEFAULT_HEALTHCARE_API_VERSION = 'v1' def DicomPathJoin(*args: str) -> str: return '/'.join([arg.strip('/') for arg in args if arg]) @dataclasses_json.dataclass_json @dataclasses.dataclass(frozen=True) class Path: """Represents a path to a DICOM Store or a DICOM resource in CHC API. Attributes: base_address: base address for url healthcare_api_version: healthcare api version project_id: Project ID. location: Location. dataset_id: Dataset ID. store_id: DICOM Store ID. study_prefix: prefix to study UID. study_uid: DICOM Study UID. series_uid: DICOM Series UID. instance_uid: DICOM Instance UID. """ base_address: str healthcare_api_version: str project_id: str location: str dataset_id: str store_id: str study_prefix: str study_uid: str series_uid: str instance_uid: str def __post_init__(self) -> None: """Validates path configuration. Returns: None Raises: ValueError: Invalid configuration. """ if _REGEX_BASE_ADDRESS.fullmatch(self.base_address) is None: raise ValueError('Invalid base_address') if ( self.healthcare_api_version and _REGEX_ID_1.fullmatch(self.healthcare_api_version) is None ): raise ValueError('Healthcare API version') if self.project_id and _REGEX_ID_1.fullmatch(self.project_id) is None: raise ValueError('Invalid project_id') if self.location and _REGEX_ID_1.fullmatch(self.location) is None: raise ValueError('Invalid location') if self.dataset_id and _REGEX_ID_2.fullmatch(self.dataset_id) is None: raise ValueError('Invalid dataset_id') if self.store_id and _REGEX_ID_2.fullmatch(self.store_id) is None: raise ValueError('Invalid store_id') id_count_defined = sum([ 1 for id in [ self.project_id, self.location, self.dataset_id, self.store_id, ] if id ]) if ( self.study_prefix != 'dicomWeb' and self.base_address == _HEALTHCARE_API_URL ): raise ValueError('Invalid study_prefix') if id_count_defined != 0 and id_count_defined != 4: raise ValueError('Invalid id') if self.study_uid and _REGEX_UID.fullmatch(self.study_uid) is None: raise ValueError('Invalid study_uid') if self.series_uid and _REGEX_UID.fullmatch(self.series_uid) is None: raise ValueError('Invalid series_uid') if self.instance_uid and _REGEX_UID.fullmatch(self.instance_uid) is None: raise ValueError('Invalid instance_uid') self._StudyUidMissing(self.study_uid) self._SeriesUidMissing(self.series_uid) def _StudyUidMissing(self, value: str) -> None: if not value: if self.series_uid or self.instance_uid: raise ValueError( 'study_uid missing with non-empty series_uid or instance_uid.' f' series_uid: {self.series_uid}, instance_uid: {self.instance_uid}' ) def _SeriesUidMissing(self, value: str) -> None: if not value: if self.instance_uid: raise ValueError( 'series_uid missing with non-empty instance_uid. instance_uid:' f' {self.instance_uid}' ) def _BuildGoogleStorePath(self) -> str: """Returns component of path identifying google DICOM store.""" if not self.project_id: return '' else: return DicomPathJoin( 'projects', self.project_id, 'locations', self.location, 'datasets', self.dataset_id, 'dicomStores', self.store_id, ) def _BuildUidPath(self) -> str: """Returns UID component of path to imaging in DICOM store.""" if not self.study_uid: return self.study_prefix study_path_str = DicomPathJoin(self.study_prefix, 'studies', self.study_uid) if not self.series_uid: return study_path_str series_path_str = DicomPathJoin(study_path_str, 'series', self.series_uid) if not self.instance_uid: return series_path_str return DicomPathJoin(series_path_str, 'instances', self.instance_uid) @property def complete_url(self) -> str: """Returns the complete url of the path.""" return DicomPathJoin( self.base_address, self.healthcare_api_version, self._BuildGoogleStorePath(), self._BuildUidPath(), ) def __eq__(self, other: Any) -> bool: if isinstance(other, Path): return self.complete_url == other.complete_url if isinstance(other, str): return self.complete_url == other return False def __str__(self): """Returns the text representation of the path.""" return self.complete_url @property def type(self) -> Type: """Type of the DICOM resource corresponding to the path.""" if not self.study_uid: return Type.STORE elif not self.series_uid: return Type.STUDY elif not self.instance_uid: return Type.SERIES return Type.INSTANCE def GetStorePath(self) -> Path: """Returns the sub-path for the DICOM Store within this path.""" return Path( self.base_address, self.healthcare_api_version, self.project_id, self.location, self.dataset_id, self.store_id, self.study_prefix, '', '', '', ) def GetStudyPath(self) -> Path: """Returns the sub-path for the DICOM Study within this path.""" if self.type == Type.STORE: raise ValueError("Can't get a study path from a store path.") return Path( self.base_address, self.healthcare_api_version, self.project_id, self.location, self.dataset_id, self.store_id, self.study_prefix, self.study_uid, '', '', ) def GetSeriesPath(self) -> Path: """Returns the sub-path for the DICOM Series within this path.""" if self.type in (Type.STORE, Type.STUDY): raise ValueError(f"Can't get a series path from a {self.type} path.") return Path( self.base_address, self.healthcare_api_version, self.project_id, self.location, self.dataset_id, self.store_id, self.study_prefix, self.study_uid, self.series_uid, '', ) def _MatchRegex(regex: re.Pattern[str], text_str: str, error_str) -> Match[str]: """Matches the regex and returns the match or raises ValueError if failed.""" match = regex.match(text_str) if match is None: raise ValueError(error_str) return match def _FromString(path_str: str) -> Path: """Parses the string and returns the Path object or raises ValueError if failed.""" match_err_str = f'Error parsing the path. Path: {path_str}' prased_url = urllib.parse.urlparse(path_str) if not prased_url.scheme: base_address = _HEALTHCARE_API_URL healthcare_api_version = _DEFAULT_HEALTHCARE_API_VERSION else: # check if full url has been provided if prased_url.scheme.lower() not in ('http', 'https'): raise ValueError(match_err_str) if not prased_url.netloc: raise ValueError(match_err_str) base_address = f'{prased_url.scheme}://{prased_url.netloc}' path_str = prased_url.path if base_address.lower() != _HEALTHCARE_API_URL: healthcare_api_version = '' else: path_str = path_str.strip('/') path_str_parts = path_str.split('/') healthcare_api_version = path_str_parts[0] if not healthcare_api_version: raise ValueError(match_err_str) path_str = '/'.join(path_str_parts[1:]) path_str = path_str.strip('/') is_healthcare_api_url = base_address.lower() == _HEALTHCARE_API_URL if is_healthcare_api_url: store_match = _MatchRegex(_REGEX_STORE, path_str, match_err_str) project_id = store_match.group(1) location = store_match.group(2) dataset_id = store_match.group(3) store_id = store_match.group(4) store_path_suffix = store_match.group(5) study_prefix = 'dicomWeb' else: project_id = '' location = '' dataset_id = '' store_id = '' study_prefix = '' store_path_suffix = path_str if not store_path_suffix: return Path( base_address, healthcare_api_version, project_id, location, dataset_id, store_id, study_prefix, '', '', '', ) try: studies_match = _MatchRegex( _REGEX_STUDIES, store_path_suffix, match_err_str ) except ValueError: store_path_suffix = store_path_suffix.strip().strip('/') if store_path_suffix: study_prefix = store_path_suffix if is_healthcare_api_url and study_prefix != 'dicomWeb': raise return Path( base_address, healthcare_api_version, project_id, location, dataset_id, store_id, study_prefix, '', '', '', ) study_prefix = studies_match.group(2) if study_prefix is None: study_prefix = '' if study_prefix: study_prefix = study_prefix.strip('/') if is_healthcare_api_url and study_prefix != 'dicomWeb': raise ValueError(match_err_str) study_uid = studies_match.group(3) study_path_suffix = studies_match.group(4) if not study_path_suffix: return Path( base_address, healthcare_api_version, project_id, location, dataset_id, store_id, study_prefix, study_uid, '', '', ) study_path_suffix = study_path_suffix.strip('/') series_match = _MatchRegex(_REGEX_SERIES, study_path_suffix, match_err_str) series_uid = series_match.group(1) series_path_suffix = series_match.group(2) series_path_suffix = series_path_suffix.strip('/') if not series_path_suffix: return Path( base_address, healthcare_api_version, project_id, location, dataset_id, store_id, study_prefix, study_uid, series_uid, '', ) instance_match = _MatchRegex( _REGEX_INSTANCE, series_path_suffix, match_err_str ) instance_uid = instance_match.group(1) return Path( base_address, healthcare_api_version, project_id, location, dataset_id, store_id, study_prefix, study_uid, series_uid, instance_uid, ) def FromString(path_str: str, path_type: Optional[Type] = None) -> Path: """Parses the string and returns the Path object or raises ValueError if failed. Args: path_str: The string containing the path. path_type: The expected type of the path or None if no specific type is expected. Returns: The newly constructed Path object. Raises: ValueError if the path cannot be parsed or the actual path type doesn't match the specified expected type. """ path = _FromString(path_str) # Validate that the path is of the right type of the type is specified. if path_type is not None and path.type != path_type: raise ValueError( f'Unexpected path type. Expected: {path_type}, actual: {path.type}.' f' Path: {path_str}' ) return path def FromPath( base_path: Path, store_id: Optional[str] = None, study_uid: Optional[str] = None, series_uid: Optional[str] = None, instance_uid: Optional[str] = None, ) -> Path: """Creates a new Path object based on the provided one. Replaces the specified path components in the base path to create the new one. Args: base_path: The base path to use. store_id: The store ID to use in the new path or None if the store ID from the base path should be used. study_uid: The study UID to use in the new path or None if the study UID from the base path should be used. series_uid: The series UID to use in the new path or None if the series UID from the base path should be used. instance_uid: The instance UID to use in the new path or None if the instance UID from the base path should be used. Returns: The newly constructed Path object. Raises: ValueError if the new path is invalid (e.g. if the instance UID is specified, but the series UID is None). """ default_study_uid = base_path.study_uid default_series_uid = base_path.series_uid default_instance_uid = base_path.instance_uid if store_id is None: store_id = base_path.store_id else: default_study_uid = '' default_series_uid = '' default_instance_uid = '' if study_uid is None: study_uid = default_study_uid else: default_series_uid = '' default_instance_uid = '' if series_uid is None: series_uid = default_series_uid else: default_instance_uid = '' if instance_uid is None: instance_uid = default_instance_uid return Path( base_path.base_address, base_path.healthcare_api_version, base_path.project_id, base_path.location, base_path.dataset_id, store_id, base_path.study_prefix, study_uid, series_uid, instance_uid, )