pathology/dicom_proxy/frame_retrieval_util.py (410 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """DICOM frame retrieval utility.""" from collections.abc import Mapping from concurrent import futures import copy import dataclasses import http.client from typing import Any, List, MutableMapping, Optional, Union import requests import requests_toolbelt from pathology.dicom_proxy import base_dicom_request_error from pathology.dicom_proxy import dicom_proxy_flags from pathology.dicom_proxy import dicom_url_util from pathology.dicom_proxy import enum_types from pathology.dicom_proxy import metadata_util from pathology.dicom_proxy import pydicom_single_instance_read_cache from pathology.dicom_proxy import redis_cache from pathology.dicom_proxy import render_frame_params from pathology.dicom_proxy import user_auth_util from pathology.shared_libs.logging_lib import cloud_logging_client # types _PyDicomSingleInstanceCache = ( pydicom_single_instance_read_cache.PyDicomSingleInstanceCache ) _DicomSopInstanceUrl = dicom_url_util.DicomSopInstanceUrl _AuthSession = user_auth_util.AuthSession _Compression = enum_types.Compression _DicomStoreFrameTransaction = dicom_url_util.DicomStoreFrameTransaction # Constants _ACCEPT = 'Accept' # Expected RAW Metadata _CONTENT_TYPE = 'Content-Type' CACHE_LOADING_FRAME_BYTES = b'LoadingCache' # supported bulk data requests # https://cloud.google.com/healthcare-api/docs/dicom#json_metadata_and_bulk_data_requests _BASELINE_JPEG_MIME_TYPE_AND_TRANSFER_SYNTAX = ( b'image/jpeg; transfer-syntax=1.2.840.10008.1.2.4.50' ) _JPEG2000_LOSSLESS_MIME_TYPE_AND_TRANSFER_SYNTAX = ( b'image/jp2; transfer-syntax=1.2.840.10008.1.2.4.90' ) _JPEG2000_LOSSY_MIME_TYPE_AND_TRANSFER_SYNTAX = ( b'image/jp2; transfer-syntax=1.2.840.10008.1.2.4.91' ) _JPEGXL_LOSSLESS_TRANSFER_SYNTAX = ( b'image/jxl; transfer-syntax=1.2.840.10008.1.2.4.110' ) _JPEGXL_JPEG_RECOMPRESSION_TRANSFER_SYNTAX = ( b'image/jxl; transfer-syntax=1.2.840.10008.1.2.4.111' ) _JPEGXL_TRANSFER_SYNTAX = b'image/jxl; transfer-syntax=1.2.840.10008.1.2.4.112' _JPEGXL_LOSSLESS_APPLICATION_OCTET_STREAM = ( b'application/octet-stream; transfer-syntax=1.2.840.10008.1.2.4.110' ) _JPEGXL_JPEG_RECOMPRESSION_APPLICATION_OCTET_STREAM = ( b'application/octet-stream; transfer-syntax=1.2.840.10008.1.2.4.111' ) _JPEGXL_APPLICATION_OCTET_STREAM = ( b'application/octet-stream; transfer-syntax=1.2.840.10008.1.2.4.112' ) _SUPPORTED_BULK_DATA_REQUESTS = ( _BASELINE_JPEG_MIME_TYPE_AND_TRANSFER_SYNTAX, _JPEG2000_LOSSLESS_MIME_TYPE_AND_TRANSFER_SYNTAX, _JPEG2000_LOSSY_MIME_TYPE_AND_TRANSFER_SYNTAX, _JPEGXL_LOSSLESS_TRANSFER_SYNTAX, _JPEGXL_JPEG_RECOMPRESSION_TRANSFER_SYNTAX, _JPEGXL_TRANSFER_SYNTAX, _JPEGXL_LOSSLESS_APPLICATION_OCTET_STREAM, _JPEGXL_JPEG_RECOMPRESSION_APPLICATION_OCTET_STREAM, _JPEGXL_APPLICATION_OCTET_STREAM, ) class DicomFrameRequestError(base_dicom_request_error.BaseDicomRequestError): """Exception which wraps error responses from DICOM store.""" def __init__(self, response: requests.Response, msg: Optional[str] = None): super().__init__('DicomFrameRequestError', response, msg) class BaseFrameRetrievalError(Exception): """Baseclass for exceptions associated with frame retrieval after request.""" class _FrameIndexError(BaseFrameRetrievalError): """Exception occurred requesting images.""" class _EmptyFrameURLError(BaseFrameRetrievalError): """Exception occurred retrieving images.""" class _InvalidNumberOfReturnedFramesError(BaseFrameRetrievalError): def __init__(self, msg: Optional[str] = None): if msg is None: msg = 'Number of frames returned != number of frames requested.' super().__init__(msg) @dataclasses.dataclass(frozen=True) class FrameData: image: Optional[bytes] downloaded_from_dicom_store: bool class RequestSessionHandler: """Holds current transaction http request session. Enables session reuse within transaction do not use session in parallel. """ def __init__(self): self._session: Optional[requests.Session] = None def session_get( self, url: str, headers: Optional[Mapping[str, str]] ) -> requests.Response: """Returns request session.""" if self._session is None: self._session = requests.Session() return self._session.get(url, headers=headers) def __getstate__(self) -> MutableMapping[str, Any]: """Do not pickle session.""" dct = copy.copy(self.__dict__) del dct['_session'] return dct def __setstate__(self, dct: MutableMapping[str, Any]) -> None: """Init session to empty on de-serializing.""" self.__dict__ = dct self._session = None def __del__(self): """Close session when handler destoryed.""" if self._session is None: return self._session.close() def frame_lru_cache_key(transaction: _DicomStoreFrameTransaction) -> str: """Returns cachetools lru cache key based on transaction request. Args: transaction: DICOM Store query that returns requested frame data. Returns: LRU cache key. """ rendered_image_type = transaction.headers[_ACCEPT] return f'{transaction.url}:{rendered_image_type}' def frame_cache_ttl() -> Optional[int]: """Returns TTL in seconds frames should be held in REDIS frame cache.""" ttl = dicom_proxy_flags.FRAME_CACHE_TTL_FLG.value if ttl < 0: return None return ttl def get_raw_frame_data( user_auth: _AuthSession, instance_url: _DicomSopInstanceUrl, frame_numbers: List[int], render_params: render_frame_params.RenderFrameParams, session_handler: Optional[RequestSessionHandler] = None, ) -> List[FrameData]: """Requests contiguous list of untranscoded frame data from DICOM store. Caches returned frames in redis for use by higher level functions. Args: user_auth: User authentication session, session can be none if reading local instance (PyDicomSingleInstanceCache). instance_url: DICOM Instance to read from. frame_numbers: Frame numbers to return. render_params: Rendered Frame Params. (Requested compression imaging format. Imaging may be returned in a different format and require transcoding and disable caching). session_handler: Request session object to enable connection pooling on session. Set to None if session can be access simultationusly from multiple threads. Returns: List of data encoded in frames with flag set to indicate frames loaded from DICOM Server. Raises: _EmptyFrameURLError: Transaction URL empty. DicomFrameRequestError: HTTP error in DICOM frame request. """ transaction = dicom_url_util.download_dicom_raw_frame( user_auth, instance_url, frame_numbers, render_params, ) if session_handler is None: response = requests.get(transaction.url, headers=transaction.headers) else: response = session_handler.session_get(transaction.url, transaction.headers) if response.status_code != http.client.OK: raise DicomFrameRequestError(response) try: multipart_data = requests_toolbelt.MultipartDecoder.from_response(response) except ( AttributeError, requests_toolbelt.NonMultipartContentTypeException, requests_toolbelt.ImproperBodyPartContentException, ) as exp: raise DicomFrameRequestError( response, msg='NonMultipartContentTypeException' ) from exp try: part_count = len(multipart_data.parts) expected_frame_count = len(transaction.frame_numbers) if part_count != expected_frame_count: raise DicomFrameRequestError( response, msg=( f'Expected {expected_frame_count} multipart response actually' f' received {part_count}' ), ) frame_data = [] for index in range(part_count): part = multipart_data.parts[index] content_type = part.headers.get(_CONTENT_TYPE.encode('utf-8')) if content_type not in _SUPPORTED_BULK_DATA_REQUESTS: raise DicomFrameRequestError( response, msg=( 'Frame request returned invalid content type. Received: ' f'{content_type}' ), ) result = part.content if result is None or not result: raise DicomFrameRequestError( response, msg='Frame request returned no data.' ) frame_data.append(result) except (IndexError, KeyError, AttributeError) as exp: raise DicomFrameRequestError( response, msg='Incorrectly formatted multipart response.' ) from exp redis = redis_cache.RedisCache(transaction.enable_caching) frame_data_list = [] for index, result in zip(frame_numbers, frame_data): transaction = dicom_url_util.download_dicom_raw_frame( user_auth, instance_url, [index], render_params ) cache_key = frame_lru_cache_key(transaction) redis.set( cache_key, result, allow_overwrite=True, ttl_sec=frame_cache_ttl() ) frame_data_list.append(FrameData(result, True)) return frame_data_list def _get_rendered_frame(transaction: _DicomStoreFrameTransaction) -> FrameData: """Returns result of rendered frame request. Caches frames in local redis. Cache mannaged by LRU. Cache works across users and process. Args: transaction: DICOM Store query that returns requested frame data. Returns: Requested frame bytes encoding frame in jpeg or png format. Raises: _EmptyFrameURLError: Transaction URL empty. DicomFrameRequestError: HTTP error in DICOM frame request. """ if not transaction.url: raise _EmptyFrameURLError() redis = redis_cache.RedisCache(transaction.enable_caching) cache_key = frame_lru_cache_key(transaction) result = redis.get(cache_key) if ( result is not None and result.value is not None and result.value != CACHE_LOADING_FRAME_BYTES ): return FrameData(result.value, False) response = requests.get(transaction.url, headers=transaction.headers) if response.status_code != http.client.OK: raise DicomFrameRequestError(response) received_content_type = response.headers.get(_CONTENT_TYPE) expected_content_type = transaction.headers.get(_ACCEPT) if ( received_content_type is None or received_content_type != expected_content_type ): raise DicomFrameRequestError( response, msg=( f'Invalid content type. Expected: {expected_content_type}; ' f'Received: {received_content_type}' ), ) response_bytes = response.content if response_bytes is None or not response_bytes: raise DicomFrameRequestError( response, msg='Frame request returned no data.' ) redis.set(cache_key, response_bytes, ttl_sec=frame_cache_ttl()) return FrameData(response_bytes, True) def _get_raw_frame_list( user_auth: _AuthSession, instance_url: _DicomSopInstanceUrl, frame_numbers: List[int], render_params: render_frame_params.RenderFrameParams, session_handler: Optional[RequestSessionHandler], ) -> List[FrameData]: """Returns list of untranscoded frame data. Caches frames in local redis. Cache mannaged by LRU and works across users and processes. Contiguous blocks of frames retrieved in single HTTP transaction discontinous frames retrieved in parallel. Args: user_auth: User authentication session, session can be none if reading local instance (PyDicomSingleInstanceCache). instance_url: DICOM Instance to read from. frame_numbers: List of frame numbers to return imaging for. render_params: Rendered Frame Params. (Requested compression imaging format. Imaging may be returned in a different format and require transcoding and disable caching). session_handler: Request session object to enable connection pooling on session. Set to None if session can be access simultationusly from multiple threads. Returns: List of bytes encoded in frame with flag indicating if data loaded from cache or server. Raises: _EmptyFrameURLError: Transaction URL empty. DicomFrameRequestError: HTTP error in DICOM frame request. """ if not instance_url: raise _EmptyFrameURLError() if not frame_numbers: return [] results = [] redis = redis_cache.RedisCache(render_params.enable_caching) load_frame_numbers = [] load_frame_index = [] for index, frame_number in enumerate(frame_numbers): result = redis.get( frame_lru_cache_key( dicom_url_util.download_dicom_raw_frame( user_auth, instance_url, [frame_number], render_params ) ) ) if ( result is not None and result.value is not None and result.value != CACHE_LOADING_FRAME_BYTES ): results.append(FrameData(result.value, False)) else: results.append(FrameData(None, True)) load_frame_numbers.append(frame_number) load_frame_index.append(index) if not load_frame_numbers: return results frame_results = get_raw_frame_data( user_auth, instance_url, load_frame_numbers, render_params, session_handler, ) for index, result_index in enumerate(load_frame_index): results[result_index] = frame_results[index] return results def _get_rendered_frame_list( dicom_frames: List[_DicomStoreFrameTransaction], ) -> List[FrameData]: """Returns frame imaging bytes retrieved using thread pool. Args: dicom_frames: List of frame numbers to return imaging for. Returns: List of frame imaging bytes. """ if len(dicom_frames) == 1: return [_get_rendered_frame(dicom_frames[0])] if not dicom_frames: return [] max_workers = min( len(dicom_frames), dicom_proxy_flags.MAX_PARALLEL_FRAME_DOWNLOADS_FLG.value, ) with futures.ThreadPoolExecutor(max_workers=max_workers) as pool: return list(pool.map(_get_rendered_frame, dicom_frames)) def get_local_frame_list( dicom_instance_source: _PyDicomSingleInstanceCache, dicom_frames: List[int], frame_numbers_start_at_index_0: bool = True, ) -> List[FrameData]: """Returns frame imaging bytes retrieved using thread pool. Args: dicom_instance_source: Local DICOM Instance. dicom_frames: List of frame numbers to return imaging for. frame_numbers_start_at_index_0: Frame number index starts at 0. Returns: List of frame imaging bytes. Raises: _FrameIndexError: Invalid frame index requested. """ try: if frame_numbers_start_at_index_0: return [ FrameData(dicom_instance_source.get_encapsulated_frame(index), False) for index in dicom_frames ] return [ FrameData( dicom_instance_source.get_encapsulated_frame(index - 1), False ) for index in dicom_frames ] except IndexError as exp: raise _FrameIndexError( f'Invalid frame index requested; requested frames: {dicom_frames}' ) from exp @dataclasses.dataclass class FrameImages: """Data class holding frame bytes and compression.""" images: MutableMapping[int, bytes] # Mapping of frame number: bytes. compression: _Compression # Compression format of bytes number_of_frames_downloaded_from_store: int def _create_frame_images( frame_numbers: List[int], frame_images: List[FrameData], compression: Optional[_Compression], ) -> FrameImages: """Creates FrameImages dataclass. Args: frame_numbers: List of frame numbers. frame_images: Corresponding list of frame images. compression: Image compression encoding. Raises: _InvalidNumberOfReturnedFramesError: len of frame numbers != len of frame images. Returns: FrameImages Dataclass """ if len(frame_numbers) != len(frame_images): raise _InvalidNumberOfReturnedFramesError() if compression is None: # Should only be raised if additional compression format is added # programmatically and caller to _create_frame_images is not # updated to handle the new format. raise ValueError('Compression format for frames not initialized.') number_of_frames_downloaded_from_store = sum([ 1 for frame_data in frame_images if frame_data.downloaded_from_dicom_store ]) return FrameImages( { frame_index: image_mem.image for frame_index, image_mem in zip(frame_numbers, frame_images) }, compression, number_of_frames_downloaded_from_store, ) def get_dicom_frame_map( user_auth: _AuthSession, dicom_instance_source: Union[ _PyDicomSingleInstanceCache, _DicomSopInstanceUrl ], render_params: render_frame_params.RenderFrameParams, metadata: metadata_util.DicomInstanceMetadata, frames: List[int], session_handler: Optional[RequestSessionHandler] = None, ) -> FrameImages: """Gets frame images from DICOM instance in store or locally. Args: user_auth: User authentication session, session can be none if reading local instance (PyDicomSingleInstanceCache) dicom_instance_source: DICOM Instance to read from, either URL to instance on server (DicomSopInstanceUrl) or Reference to local file (PyDicomSingleInstanceCache) render_params: Rendered Frame Params. (Requested compression imaging format. Imaging may be returned in a different format and require transcoding and disable caching). metadata: Metadata for DICOM Instance. frames: List/set of DICOM frames to return. session_handler: Request session object to enable connection pooling on session. Set to None if session can be access simultationusly from multiple threads. Returns: FrameImages Raises: DicomFrameRequestError: Error occurred retrieving frames. _FrameIndexError: Invalid frame index. _InvalidNumberOfReturnedFramesError: # of frames returned != requested. """ if min(frames) < 0: raise _FrameIndexError( f'Requesting frame # < 0; Frame numbers requested: {frames}' ) if max(frames) >= metadata.number_of_frames: raise _FrameIndexError( 'Requesting frame # >= metadata number of frames; ' f'Number of frames: {metadata.number_of_frames} ' f'Frame numbers requested: {frames}' ) is_local_file = isinstance(dicom_instance_source, _PyDicomSingleInstanceCache) if is_local_file: frame_numbers = [] else: frame_numbers = [frame_number + 1 for frame_number in frames] if not render_params.enable_caching: cloud_logging_client.warning('Frame caching disabled') try: # Raw image retrieval requires imaging be stored in DICOM transfer syntax # DICOM Proxy can decode. This option can result in higher performance and # quality because the image are returned from the store without transcoding. # Returning a rendered instance of JPEG encoded instance from the DICOM # store will result in decoding and re-encoding the encoded instance to # jpeg. Raw retrieval just returns stored bytes. # # The DICOM Proxy only supports direct decoding of Baseline JPEG, JPEG2000, # or RAW encoded pixel data. if metadata.is_baseline_jpeg or metadata.is_jpeg2000 or metadata.is_jpegxl: if is_local_file: frame_images = get_local_frame_list(dicom_instance_source, frames) else: try: frame_images = _get_raw_frame_list( user_auth, dicom_instance_source, frame_numbers, render_params, session_handler, ) except DicomFrameRequestError as exp: cloud_logging_client.warning( ( 'Dicom frame request failed. Trying to retrieve frame using ' 'rendered api. Fail over will reduce performance.' ), exp, ) frame_images = [] # frame retrieval may return empty if instances are returned from # an instance which has transfer syntrax other than Jpeg Baseline, # jpeg 2000, or JPGX if frame_images: if metadata.is_baseline_jpeg: returned_image_compression = _Compression.JPEG elif metadata.is_jpeg2000: returned_image_compression = _Compression.JPEG2000 elif metadata.is_jpg_transcoded_to_jpegxl: returned_image_compression = _Compression.JPEG_TRANSCODED_TO_JPEGXL elif metadata.is_jpegxl: returned_image_compression = _Compression.JPEGXL else: returned_image_compression = None return _create_frame_images( frames, frame_images, returned_image_compression ) if is_local_file: raise ValueError( 'Generation of rendered frames from local files is not supported.' ) params = [] for frame_number in frame_numbers: params.append( dicom_url_util.download_rendered_dicom_frame( user_auth, dicom_instance_source, frame_number, render_params ) ) return _create_frame_images( frames, _get_rendered_frame_list(params), dicom_url_util.get_rendered_frame_compression( render_params.compression ), ) except _InvalidNumberOfReturnedFramesError as exp: if isinstance(dicom_instance_source, _PyDicomSingleInstanceCache): source = dicom_instance_source.path else: source = dicom_instance_source raise _InvalidNumberOfReturnedFramesError( 'Number of images retrieved != number of images requested; ' f'Source: {source}' ) from exp