ez_wsi_dicomweb/local_dicom_slide_cache.py (930 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Local DICOM Slide Read Cache.""" from __future__ import annotations import collections from concurrent import futures import copy import functools import heapq import io import itertools import threading import time import typing from typing import Any, BinaryIO, Iterator, List, Mapping, MutableMapping, Optional, Tuple import uuid import cachetools from ez_wsi_dicomweb import credential_factory as credential_factory_module from ez_wsi_dicomweb import dicom_web_interface from ez_wsi_dicomweb import ez_wsi_errors from ez_wsi_dicomweb import ez_wsi_logging_factory from ez_wsi_dicomweb import local_dicom_slide_cache_types from ez_wsi_dicomweb import slide_level_map from ez_wsi_dicomweb.ml_toolkit import dicom_path import psutil import pydicom _PYDICOM_MAJOR_VERSION = int((pydicom.__version__).split('.')[0]) class _LogKeywords: DICOM_FRAME_NUMBER_RANGE_LIST = 'dicom_frame_number_range_list' DICOM_WEB_INSTANCE_PATH = 'dicom_web_instance_path' EXECUTION_TIME_SEC = 'execution_time_sec' FRAME_NUMBER = 'frame_number' INSTANCE_CACHE_WORKER_TRACE_UID = 'instance_cache_worker_trace_uid' INSTANCE_CACHE_LIFETIME_TRACE_UID = 'instance_cache_lifetime_trace_uid' NUMBER_OF_FRAMES_IN_DICOM_INSTANCE = 'number_of_frames_in_dicom_instance' RUNNING_AS_THREAD = 'running_as_thread' # Number of frames to read when frame request misses the cache. DEFAULT_NUMBER_OF_FRAMES_TO_READ_ON_CACHE_MISS = 500 # Control total number of threads which can be executed concurrently to # orchestrate operations. Orchestrate operations queue operations to # uploaded and download queues. Orchestrator functions queued on a # separate thread pool from upload and downloads to make it impossible for # running orchestrator threads to block execution of upload and download # threads. _MAX_ORCHESTRATOR_WORKER_THREADS = int(4) # Prefer whole instance downloads over frame retrieval if total instance # frame size is smaller than threshold. MAX_INSTANCE_NUMBER_OF_FRAMES_TO_PREFER_WHOLE_INSTANCE_DOWNLOAD = int(10000) # https://www.dicomlibrary.com/dicom/transfer-syntax/ _UNENCAPSULATED_TRANSFER_SYNTAXES = frozenset([ '1.2.840.10008.1.2.1', # Explicit VR Little Endian '1.2.840.10008.1.2', # Implicit VR Endian: Default Transfer Syntax '1.2.840.10008.1.2.1.99', # Deflated Explicit VR Little Endian '1.2.840.10008.1.2.2', # Explicit VR Big Endian ]) _InstanceFameKey = Tuple[int, str] _SharedFrameMemory = MutableMapping[_InstanceFameKey, bytes] def _future_list_built_test_hook( future_list: List[futures.Future[None]], ) -> List[futures.Future[None]]: """Function is a NOP in prod; Acts as mock target for unit test. Used in test_block_until_all_instance_frame_futures_are_loaded mocks. Test validates that block_until_frames_are_loaded waits correctly for futures to complete. Mock enables unit test to ensure that threads will not complete before waiting has begun (the actual unit test case). Other test cases test that block_until_all_instance_frame_futures_are_loaded functions correctly if the tests complete before waiting starts. Args: future_list: List of futures. Returns: List of futures. """ return future_list def _frame_number_key( instance_path: dicom_path.Path, frame_number: int ) -> _InstanceFameKey: """Returns instance frame hash key for shared memory dict.""" return (frame_number, instance_path.complete_url) def _is_unencapsulated_image_transfer_syntax(uid: str) -> bool: """Returns True if uid is in the list of raw transfer syntaxes.""" return uid in _UNENCAPSULATED_TRANSFER_SYNTAXES def _load_frame_list(buffer: BinaryIO) -> List[bytes]: """Loads DICOM instance frames into memory as a list of frames(bytes). Args: buffer: Buffer containing binary DICOM instance data. Returns: List of bytes encoded in DICOM instance frames. """ with pydicom.dcmread(buffer) as ds: if 'PixelData' not in ds or not ds.PixelData: return [] try: number_of_frames = int(ds.NumberOfFrames) except (ValueError, AttributeError) as _: return [] if number_of_frames < 1: return [] if _is_unencapsulated_image_transfer_syntax(ds.file_meta.TransferSyntaxUID): step = int(len(ds.PixelData) / number_of_frames) return [ ds.PixelData[fnum * step : (fnum + 1) * step] for fnum in range(number_of_frames) ] if _PYDICOM_MAJOR_VERSION <= 2: # pytype: disable=module-attr frame_bytes_generator = pydicom.encaps.generate_pixel_data_frame( ds.PixelData, number_of_frames ) # pytype: enable=module-attr else: # pytype: disable=module-attr frame_bytes_generator = pydicom.encaps.generate_frames( ds.PixelData, number_of_frames=number_of_frames ) # pytype: enable=module-attr return [frame_bytes for frame_bytes in frame_bytes_generator] def _get_frame_number_range_list( frame_list: List[int], logger: ez_wsi_logging_factory.AbstractLoggingInterface, ) -> List[Tuple[int, int]]: """Converts list of frame numbers into inclusive list ranges of frame numbers. Converts [1, 2, 3, 8, 9, 10] -> [(1, 3), (8, 10)] Args: frame_list: List of frames to load. logger: CloudLoggingClientInstance Returns: List of Frame number ranges to load. Raises: InvalidFrameNumberError: Frame number values < 1 were provided. """ frame_number_range_list = [] start_frame_number = None prior_frame_number = -1 for fnum in frame_list: if fnum < 1: raise local_dicom_slide_cache_types.InvalidFrameNumberError( f'DICOM Frame numbers must be >= 1; encountered: {fnum}.' ) if fnum < prior_frame_number: logger.warning( 'Performance could be improved by providing the list of frame numbers' ' in sorted order.' ) return _get_frame_number_range_list(sorted(frame_list), logger) if start_frame_number is None: start_frame_number = fnum prior_frame_number = fnum elif prior_frame_number == fnum: continue elif prior_frame_number + 1 == fnum: prior_frame_number = fnum else: frame_number_range_list.append((start_frame_number, prior_frame_number)) start_frame_number = fnum prior_frame_number = fnum if start_frame_number is not None: frame_number_range_list.append((start_frame_number, prior_frame_number)) return frame_number_range_list def _get_instance_path_list( instance_path: local_dicom_slide_cache_types.InstancePathType, ) -> List[Tuple[dicom_path.Path, int]]: """Returns a list of paths to DICOM instances specified by param. Args: instance_path: Value which represents one or more DICOM web instances. Returns: List of tuples[DICOM web instance paths, number of frames in instance]. Raises: local_dicom_slide_cache_types.UnexpectedTypeError: Instance_path is an unexpected type. """ if isinstance(instance_path, slide_level_map.Level): return [ (instance.dicom_object.path, instance.frame_count) for instance in instance_path.instances.values() ] elif isinstance(instance_path, slide_level_map.Instance): return [( instance_path.dicom_object.path, instance_path.frame_count, )] else: raise local_dicom_slide_cache_types.UnexpectedTypeError( 'Unexpected DICOM web instance path type.' ) def _log_elapsed_time(start_time: float) -> Mapping[str, Any]: return {_LogKeywords.EXECUTION_TIME_SEC: time.time() - start_time} class InMemoryDicomSlideCache: """In memory cache for EZ-WSI library access of DICOM Instances. Cache functions by downloading DICOM instance pixel data from DICOM store in blocks or entire instances. The total size of the frame cache can be managed via a size (bytes) limited LRU. The primary purposes of the cache is to speed access to DICOM frame data and reduce the total number of DICOM queries required to fetch digital pathology frame data from the DICOM store. Attributes: _dicom_instance_frame_bytes: Shared memory used to hold DICOM instance frame bytes. _number_of_frames_to_read: Number of frames of data to read on cache miss. _dicom_web_interface: Interface for DICOMweb. _lock: Threading lock to make cache access thread safe. _cache_stats: Cache state and logs. _orchestrator_thread_pool: Thread pool for high level ops which queue upload and download ops. lru_caching_enabled: True if cache a LRU cache that has a defined max size (bytes) | or False no defined max size. cache_instance_uid: UID for the cache instance preserved for lifetime of the cache and across pickle operations. optimization_hint: Cache optmization hint. """ def __init__( self, credential_factory: credential_factory_module.AbstractCredentialFactory, max_cache_frame_memory_lru_cache_size_bytes: Optional[int] = None, number_of_frames_to_read: int = DEFAULT_NUMBER_OF_FRAMES_TO_READ_ON_CACHE_MISS, max_instance_number_of_frames_to_prefer_whole_instance_download: int = MAX_INSTANCE_NUMBER_OF_FRAMES_TO_PREFER_WHOLE_INSTANCE_DOWNLOAD, optimization_hint: local_dicom_slide_cache_types.CacheConfigOptimizationHint = local_dicom_slide_cache_types.CacheConfigOptimizationHint.MINIMIZE_DICOM_STORE_QPM, logging_factory: Optional[ ez_wsi_logging_factory.AbstractLoggingInterfaceFactory ] = None, ): """Initializes InMemoryDicomSlideCache. Args: credential_factory: Factory to create credentials to use to access the DICOM store. max_cache_frame_memory_lru_cache_size_bytes: Maximum size of cache in bytes. Ideally should be in hundreds of megabyts-to-gigabyte size. If None, no limit to size. number_of_frames_to_read: Number of frames to read on cache miss. max_instance_number_of_frames_to_prefer_whole_instance_download: Max number of frames to prefer downloading whole instances over retrieving frames in batch (Typically faster for small instances e.g. < 10,0000). Optimal threshold will depend on average size of instance frame data and the size of non frame instance metadata. optimization_hint: Optimize cache to minimize data latency or total queries to the DICOM store. logging_factory: Factory to create logging interface defaults to Python logger. Raises: InvalidLRUMaxCacheSizeError: Invalid LRU max cache size. """ if max_cache_frame_memory_lru_cache_size_bytes is None: self._max_cache_frame_memory_lru_cache_size_bytes = None elif max_cache_frame_memory_lru_cache_size_bytes < 1: raise local_dicom_slide_cache_types.InvalidLRUMaxCacheSizeError() else: self._max_cache_frame_memory_lru_cache_size_bytes = ( max_cache_frame_memory_lru_cache_size_bytes ) self._dicom_instance_frame_bytes: _SharedFrameMemory = ( self._init_dicom_instance_frame_bytes() ) if logging_factory is None: self._logging_factory = ez_wsi_logging_factory.BasePythonLoggerFactory() else: self._logging_factory = logging_factory self._cache_instance_uid = uuid.uuid1() self._logger = None self._number_of_frames_to_read = int(max(number_of_frames_to_read, 1)) self._dicom_web_interface = dicom_web_interface.DicomWebInterface( credential_factory ) # Primary lock used to protect shared class state aross threads. # Required to be RLock to protect against future add_done_callback # callback finishing while locked causing a deadlock. self._lock = threading.RLock() # Protects lazy initialized class state (logger & authentication) # Different lock from self._lock to enable state to be safely initialized # independently from broader lock. self._initialization_lock = threading.Lock() self._cache_stats = local_dicom_slide_cache_types.CacheStats() self._orchestrator_thread_pool = None # Maps dicom_web_instance_path to dict of thread futures being run to load # instance frames. This dict in turn maps these thread futures to list of # frame ranges being loaded by the thread. self._running_futures: MutableMapping[ str, MutableMapping[futures.Future[None], List[Tuple[int, int]]] ] = collections.defaultdict(dict) self._optimization_hint = optimization_hint self._max_instance_number_of_frames_to_prefer_whole_instance_download = ( max_instance_number_of_frames_to_prefer_whole_instance_download ) self._init_thread_pool() @property def optimization_hint( self, ) -> local_dicom_slide_cache_types.CacheConfigOptimizationHint: return self._optimization_hint @optimization_hint.setter def optimization_hint( self, optimization_hint: local_dicom_slide_cache_types.CacheConfigOptimizationHint, ) -> None: self._optimization_hint = optimization_hint @property def cache_instance_uid(self) -> str: """Returns cache instance UID.""" return str(self._cache_instance_uid) @property def lru_caching_enabled(self) -> bool: return isinstance(self._dicom_instance_frame_bytes, cachetools.LRUCache) def cache_externally_acquired_bytes(self, key: str, data: bytes) -> bool: """Adds externally acquired bytes to cache. Args: key: Cache key for external bytes. data: Bytes to add. Returns: True if data added to cache. """ if ( self._max_cache_frame_memory_lru_cache_size_bytes is not None and len(data) > self._max_cache_frame_memory_lru_cache_size_bytes and self.lru_caching_enabled ): self._get_logger().warning( 'Data not cached. The maximum size in bytes of the LRU cache is ' 'smaller than the total size in bytes of the data. Data size: ' f'{len(data)} bytes; Maximum size of cache: ' f'{self._max_cache_frame_memory_lru_cache_size_bytes}.' ) return False self._dicom_instance_frame_bytes[f'ext:{key}'] = data return True def get_cached_externally_acquired_bytes(self, key: str) -> Optional[bytes]: """Returns acquired bytes from cache or None if key not found.""" return self._dicom_instance_frame_bytes.get(f'ext:{key}') def _init_dicom_instance_frame_bytes( self, ) -> _SharedFrameMemory: if self._max_cache_frame_memory_lru_cache_size_bytes is not None: return cachetools.LRUCache( maxsize=self._max_cache_frame_memory_lru_cache_size_bytes, getsizeof=len, ) else: return dict() def _set_cached_frame( self, instance_path: dicom_path.Path, frame_number: int, frame_bytes: bytes, ) -> None: """Set instance frame bytes in cache.""" cache_key = _frame_number_key(instance_path, frame_number) try: self._dicom_instance_frame_bytes[cache_key] = frame_bytes except ValueError as exp: if ( self.lru_caching_enabled and len(frame_bytes) > self._max_cache_frame_memory_lru_cache_size_bytes ): self._get_logger().warning( 'The maximum size in bytes of the LRU cache is smaller than the ' 'size of a single DICOM frame; LRU maximum cache size (bytes): ' f'{self._max_cache_frame_memory_lru_cache_size_bytes}; ' f'Size in bytes of DICOM frame: {len(frame_bytes)}. For optimal ' "performance LRU cache should be large enough to store 10's-100's" ' of thousands of DICOM frames.', exp, ) else: self._get_logger().warning( 'Unexpected value error occurred adding frame data to cache.', exp ) def _add_cached_instance_frames( self, instance_path: dicom_path.Path, first_frame_number: int, frame_list: List[bytes], ) -> None: """Adds instance frame data to frame cache. Args: instance_path: DICOMweb instance path. first_frame_number: First frame number of frame to set. frame_list: List of bytes for consecutive frames. Returns: None """ for frame_number in range( first_frame_number, first_frame_number + len(frame_list) ): frame_bytes = frame_list[frame_number - first_frame_number] self._set_cached_frame(instance_path, frame_number, frame_bytes) def _total_frame_bytes_read(self, dicom_frames: List[bytes]) -> int: """Returns total number of bytes in list of frames.""" total_frame_bytes_read = 0 for frame_bytes in dicom_frames: total_frame_bytes_read += len(frame_bytes) if ( self._max_cache_frame_memory_lru_cache_size_bytes is not None and total_frame_bytes_read > self._max_cache_frame_memory_lru_cache_size_bytes and self.lru_caching_enabled ): self._get_logger().warning( 'The maximum size in bytes of the LRU cache is smaller than the ' f'total size in bytes of the block of {len(dicom_frames)} DICOM ' 'frame(s) that was added to the cache in a single cache miss event; ' 'LRU maximum cache size (bytes): ' f'{self._max_cache_frame_memory_lru_cache_size_bytes}; ' f'Total size in bytes of DICOM frame(s): {total_frame_bytes_read}. ' 'For optimal performance LRU cache size should be be increased to a ' 'size much larger than the total number of frame bytes read at a ' 'single cache miss event.' ) return total_frame_bytes_read def _get_logger_signature(self) -> Mapping[str, Any]: return { _LogKeywords.INSTANCE_CACHE_WORKER_TRACE_UID: str(uuid.uuid1()), _LogKeywords.INSTANCE_CACHE_LIFETIME_TRACE_UID: str( self._cache_instance_uid ), } def _get_logger( self, ) -> ez_wsi_logging_factory.AbstractLoggingInterface: if self._logger is not None: return self._logger with self._initialization_lock: if self._logger is None: self._logger = self._logging_factory.create_logger( self._get_logger_signature() ) return self._logger def _init_thread_pool(self) -> None: """Init cache thread pools.""" # Threads in this pool conduct high level background cache loading ops. self._orchestrator_thread_pool = futures.ThreadPoolExecutor( _MAX_ORCHESTRATOR_WORKER_THREADS ) def __copy__(self) -> InMemoryDicomSlideCache: """Returns shallow copy of cache settings; does not cached data.""" cache_copy = InMemoryDicomSlideCache( credential_factory=self._dicom_web_interface.credential_factory, max_cache_frame_memory_lru_cache_size_bytes=self._max_cache_frame_memory_lru_cache_size_bytes, number_of_frames_to_read=self._number_of_frames_to_read, max_instance_number_of_frames_to_prefer_whole_instance_download=self._max_instance_number_of_frames_to_prefer_whole_instance_download, optimization_hint=self._optimization_hint, logging_factory=self._logging_factory, ) # maintain instance_cache_trace_uid across copy cache_copy._cache_instance_uid = self._cache_instance_uid return cache_copy def __deepcopy__(self, memo) -> InMemoryDicomSlideCache: return self.__copy__() def __getstate__(self): """Prepares class dictionary for pickeling. Make sure any operations loading GCS cache are completed before pickling. Deletes class state with thread pools, GCP clients, auth, and status. Returns: class dict """ state = self.__dict__.copy() del state['_dicom_instance_frame_bytes'] del state['_lock'] del state['_initialization_lock'] del state['_cache_stats'] del state['_orchestrator_thread_pool'] del state['_running_futures'] del state['_logger'] return state def __setstate__(self, dct): """Un-pickles class and re-initializes non-pickled properties.""" self.__dict__ = dct self._dicom_instance_frame_bytes = self._init_dicom_instance_frame_bytes() self._cache_stats = local_dicom_slide_cache_types.CacheStats() self._running_futures = collections.defaultdict(dict) self._init_thread_pool() self._logger = None self._lock = threading.RLock() self._initialization_lock = threading.Lock() def _get_frame_bytes( self, instance_path: dicom_path.Path, frame_number: int ) -> Optional[bytes]: """Gets frame bytes for frame from cache. Args: instance_path: DICOMweb instance path. frame_number: Frame number to return data for. Returns: Frame bytes or None if not set """ return self._dicom_instance_frame_bytes.get( _frame_number_key(instance_path, frame_number) ) def _is_frame_number_loaded( self, instance_path: dicom_path.Path, frame_number: int ) -> bool: """Returns True if frame number is loaded.""" return ( _frame_number_key(instance_path, frame_number) in self._dicom_instance_frame_bytes ) def _remove_finished_future( self, instance_path: dicom_path.Path, future: futures.Future[None] ) -> None: """Call back called by futures to remove self from loading future list.""" with self._lock: instance_url = instance_path.complete_url instance_futures = self._running_futures.get(instance_url) if instance_futures is None: return try: del instance_futures[future] except KeyError: pass if not instance_futures: del self._running_futures[instance_url] def _handle_future( self, instance_path: dicom_path.Path, loading_frames: List[Tuple[int, int]], future: futures.Future[None], ) -> None: """Adds callback to future to remove future from monitor list.""" # Add future to running list remove_future_partial = functools.partial( self._remove_finished_future, instance_path ) self._running_futures[instance_path.complete_url][future] = loading_frames # Add call back to remove future from running list future.add_done_callback(remove_future_partial) def _is_frame_number_loading( self, instance_path: dicom_path.Path, frame_number: int ) -> bool: """Returns True if instance frame_number is being loaded in a thread.""" for frame_range_list in self._running_futures[ instance_path.complete_url ].values(): for start_frame_number, end_frame_number in frame_range_list: if ( start_frame_number <= frame_number and frame_number <= end_frame_number ): return True return False def _get_instance_future_loading_frame_ranges( self, instance_path: dicom_path.Path ) -> Iterator[List[Tuple[int, int]]]: """Returns iterator of lists of frame ranges being loaded for an instance.""" return typing.cast( Iterator[List[Tuple[int, int]]], self._running_futures[instance_path.complete_url].values(), ) def _clip_frame_range_to_loading_frames( self, instance_path: dicom_path.Path, frame_range: Optional[Tuple[int, int]], ) -> Optional[Tuple[int, int]]: """Clips starting and ending frame range based on loading frames. If cache miss occurs a range of frames is requested around the missed frame; by default ~500 frames will be retrieved. Rapid repeated cache misses can result in duplicate concurrent async requests for overlapping ranges of frames. This code clips a frame range to exclude frames at the start and end of the range which are being currently loaded in another thread. If a frame range is bound by frames which are not currently being loaded but were to contain frames within the range then the frames within and the range will that are being loaded will not be excluded. This limitation is not expected to occur in practice. Example: Range: (1, 6) is clipped Ranges (1, 1) and (6,6) are being loaded on another thread. (1, 6) will be clipped to (2, 5). However, if frame (1, 6) is clipped and range (2, 4) is being loaded on another thread then (1, 6) will be returned (frames 1 and 6 are not loading). Args: instance_path: DICOM web path to instance. frame_range: Inclusive frame number range to load. Returns: Clipped starting frame number and end frame number or None if entire frame range is already being loaded. """ # Merge all loading frame range lists into single frame list sorted by # starting frame range position. if frame_range is None: return None start_frame_range, end_frame_range = frame_range for range_tpl in heapq.merge( *self._get_instance_future_loading_frame_ranges(instance_path), key=lambda x: x[0], ): start_frame_number, end_frame_number = range_tpl if ( start_frame_number <= start_frame_range and start_frame_range <= end_frame_number ): start_frame_range = end_frame_number + 1 if end_frame_range < start_frame_range: return None continue if start_frame_range < start_frame_number: break # Merge all loading frame range lists into single frame list sorted in # reverse by ending frame range position. range_end_frame_list = list( heapq.merge( *self._get_instance_future_loading_frame_ranges(instance_path), key=lambda x: x[1], ) ) range_end_frame_list.reverse() for range_tpl in range_end_frame_list: start_frame_number, end_frame_number = range_tpl if ( start_frame_number <= end_frame_range and end_frame_range <= end_frame_number ): end_frame_range = start_frame_number - 1 continue if end_frame_range > end_frame_number: break return (start_frame_range, end_frame_range) def block_until_frames_are_loaded( self, instance_path: Optional[dicom_path.Path] = None, timeout: Optional[float] = 600.0, ) -> float: """Blocks until all futures in future list, at time of call, are done. Args: instance_path: If defined blocks on futures associated with instance. timeout: Time(sec) to wait for cache loading blocks to finish; None=inf. Returns: Time waiting for cache loading to complete. """ start_time = time.time() with self._lock: if not self._running_futures: return 0.0 if instance_path is not None: instance_futures = self._running_futures.get(instance_path.complete_url) else: instance_futures = None if instance_path is not None and instance_futures is not None: # Block only on instance futures. future_list = list(instance_futures) elif instance_path is None: # Block for all running futures. future_list = [] for instance_futures in self._running_futures.values(): future_list.extend(instance_futures) else: # Instance path does not describe currently running future. future_list = [] for future in _future_list_built_test_hook(future_list): future.result(timeout=timeout) elapsed_time = time.time() - start_time self._get_logger().debug( f'Blocked until frame loading completed ({elapsed_time}(sec)).', {_LogKeywords.EXECUTION_TIME_SEC: elapsed_time}, ) with self._lock: self._cache_stats.time_spent_blocked_waiting_for_cache_loading_to_complete += ( elapsed_time ) return elapsed_time def _update_frame_block_cache_stats_bytes_read( self, start_time: float, dicom_frames: List[bytes] ) -> None: self._cache_stats.number_of_frame_bytes_read_in_frame_blocks += ( self._total_frame_bytes_read(dicom_frames) ) self._cache_stats.frame_block_read_time += time.time() - start_time self._cache_stats.number_of_frame_blocks_read += 1 self._cache_stats.number_of_frames_read_in_frame_blocks += len(dicom_frames) def _load_frame_number_ranges_thread( self, instance_path: dicom_path.Path, frame_number_range_list: List[Tuple[int, int]], ) -> None: """Loads list of DICOM instance frames numbers in batch into the cache. Args: instance_path: DICOM instance path. frame_number_range_list: List of Frame Number ranges to load. Returns: None """ if not frame_number_range_list: return start_time = time.time() log_structure = { _LogKeywords.DICOM_WEB_INSTANCE_PATH: instance_path, _LogKeywords.DICOM_FRAME_NUMBER_RANGE_LIST: frame_number_range_list, } try: dicom_frames = self._dicom_web_interface.download_instance_frame_list_untranscoded( instance_path, itertools.chain(*[ range(start_frame_number, end_frame_number + 1) for start_frame_number, end_frame_number in frame_number_range_list ]), retry=False, ) with self._lock: offset = 0 for start_frame_number, end_frame_number in frame_number_range_list: length = end_frame_number - start_frame_number + 1 self._add_cached_instance_frames( instance_path, start_frame_number, dicom_frames[offset : offset + length], ) offset += length self._update_frame_block_cache_stats_bytes_read( start_time, dicom_frames ) self._get_logger().info( 'Finished asyc loading DICOM frame number range(s) into cache.', log_structure, _log_elapsed_time(start_time), ) except ( ez_wsi_errors.HttpError, ez_wsi_errors.DownloadInstanceFrameError, ) as exp: self._get_logger().error( 'Exception occurred caching DICOM instance frames.', exp, log_structure, _log_elapsed_time(start_time), ) return except Exception as exp: self._get_logger().error( 'Exception occurred caching DICOM instance frames.', exp, log_structure, _log_elapsed_time(start_time), ) raise def _update_dicom_instance_cache_stats_bytes_read( self, start_time: float, dicom_frames: List[bytes] ) -> None: self._cache_stats.number_of_frame_bytes_read_in_dicom_instances += ( self._total_frame_bytes_read(dicom_frames) ) self._cache_stats.dicom_instance_read_time += time.time() - start_time self._cache_stats.number_of_dicom_instances_read += 1 self._cache_stats.number_of_frames_read_in_dicom_instances += len( dicom_frames ) def _cache_whole_instance_in_memory_thread( self, instance_path: dicom_path.Path, number_of_frames: int, running_as_thread: bool, ) -> None: """Loads frames from whole instance in thread. Args: instance_path: DICOM instance path to load. number_of_frames: number of frames in DICOM instance. running_as_thread: True if method running in thread (async). """ start_time = time.time() log_structure = { _LogKeywords.DICOM_WEB_INSTANCE_PATH: instance_path, _LogKeywords.NUMBER_OF_FRAMES_IN_DICOM_INSTANCE: number_of_frames, _LogKeywords.RUNNING_AS_THREAD: running_as_thread, } try: with io.BytesIO() as buffer: try: self._dicom_web_interface.download_instance_untranscoded( instance_path, buffer, retry=False ) except ez_wsi_errors.HttpError as exp: self._get_logger().error( 'Could not download DICOM instance.', log_structure, _log_elapsed_time(start_time), exp, ) return buffer.seek(0) dicom_frames = _load_frame_list(buffer) with self._lock: number_of_frames_downloaded = len(dicom_frames) if number_of_frames != number_of_frames_downloaded: self._get_logger().warning( 'Expected number of frames does not match actual DICOM instance;' ' Number of frames in DICOM instance:' f' {number_of_frames_downloaded}; Expected number of frames:' f' {number_of_frames}.', log_structure, ) number_of_frames = number_of_frames_downloaded if number_of_frames == 0: self._get_logger().warning( 'Cached whole instance DICOM instance contains zero frames.', log_structure, _log_elapsed_time(start_time), ) else: self._add_cached_instance_frames(instance_path, 1, dicom_frames) self._get_logger().info( 'Cached whole instance DICOM instance.', log_structure, _log_elapsed_time(start_time), ) self._update_dicom_instance_cache_stats_bytes_read( start_time, dicom_frames ) except Exception as exp: self._get_logger().error( 'Exception occurred caching whole DICOM instance.', exp, log_structure, _log_elapsed_time(start_time), ) raise def _filter_loaded_or_loading_frame_numbers( self, instance_path: dicom_path.Path, frame_numbers: List[int], ) -> List[int]: """Returns frames numbers in list which are not loaded or loading.""" filtered_frame_numbers = [] for fnum in frame_numbers: if self._is_frame_number_loading(instance_path, fnum): continue if self._is_frame_number_loaded(instance_path, fnum): continue filtered_frame_numbers.append(fnum) return filtered_frame_numbers def preload_instance_frame_numbers( self, instance_frame_numbers: Mapping[str, List[int]], copy_from_cache: Optional[InMemoryDicomSlideCache] = None, ) -> None: """Preloads select instance frames from DICOM Store into cache. Args: instance_frame_numbers: Map instance path to frame numbers. copy_from_cache: Optional cache to copy frames from. Returns: None. """ with self._lock: logger = self._get_logger() logger.info('Preloading DICOM instance frames') for instance_path, frame_numbers in instance_frame_numbers.items(): instance_path = dicom_path.FromString(instance_path) if not frame_numbers: continue if copy_from_cache is not None: frame_numbers_not_copied = [] for frame_number in frame_numbers: frame_bytes = copy_from_cache.get_cached_frame( instance_path, frame_number ) if frame_bytes is not None: self._set_cached_frame(instance_path, frame_number, frame_bytes) else: frame_numbers_not_copied.append(frame_number) frame_numbers = frame_numbers_not_copied frame_number_range_list = _get_frame_number_range_list( self._filter_loaded_or_loading_frame_numbers( instance_path, frame_numbers ), logger, ) if not frame_number_range_list: continue ex = typing.cast( futures.ThreadPoolExecutor, self._orchestrator_thread_pool ) self._handle_future( instance_path, frame_number_range_list, ex.submit( self._load_frame_number_ranges_thread, instance_path, frame_number_range_list, ), ) def _cache_whole_instance_in_memory( self, instance_path: dicom_path.Path, number_of_frames: int, blocking: bool, ) -> Optional[futures.Future[None]]: """Loads whole DICOM instance into memory. Args: instance_path: DICOMweb path to instance. number_of_frames: Number of frames in DICOM instance. blocking: Load cache as blocking operation. Returns: Future if async operation started or None. """ if number_of_frames <= 0: return None # if entire instance is currently loading skip. if ( self._clip_frame_range_to_loading_frames( instance_path, (1, number_of_frames) ) is None ): return None if blocking: self._cache_whole_instance_in_memory_thread( instance_path, number_of_frames, running_as_thread=False ) return None ex = typing.cast(futures.ThreadPoolExecutor, self._orchestrator_thread_pool) return ex.submit( self._cache_whole_instance_in_memory_thread, instance_path, number_of_frames, running_as_thread=True, ) def cache_whole_instance_in_memory( self, instance_paths: local_dicom_slide_cache_types.InstancePathType, blocking: bool, ) -> None: """Caches whole DICOM instance in memory. Args: instance_paths: DICOMweb path to instance. blocking: Load cache as blocking operation. """ for instance_path, number_of_frames in _get_instance_path_list( instance_paths ): future = self._cache_whole_instance_in_memory( instance_path, number_of_frames, blocking ) if future is None: continue with self._lock: frame_range_to_load = (1, number_of_frames) self._handle_future(instance_path, [frame_range_to_load], future) def _get_frame_range_to_load( self, instance_path: dicom_path.Path, number_of_frames: int, frame_number: int, max_request: int, ) -> Optional[Tuple[int, int]]: """Get the range of frames to load in cached frame block. Always return frame_number in returned range. Args: instance_path: DICOMweb instance path. number_of_frames: Number of Frames in DICOM instance. frame_number: Frame number triggering caching. max_request: Maximum size of frame cache block. Returns: Tuple[Initialized starting frame number, ending frame number] or None if no valid frame range. """ if ( number_of_frames < 1 or max_request < 1 or frame_number > number_of_frames or frame_number < 1 ): return None last_frame_number = min(number_of_frames, frame_number + max_request - 1) for last_frame_number in range(last_frame_number, frame_number - 1, -1): cache_key = _frame_number_key(instance_path, last_frame_number) if cache_key not in self._dicom_instance_frame_bytes: break # DICOM frame numbers start at 1 first_frame_number = max(last_frame_number - max_request + 1, 1) for first_frame_number in range(first_frame_number, frame_number + 1): cache_key = _frame_number_key(instance_path, first_frame_number) if cache_key not in self._dicom_instance_frame_bytes: break return first_frame_number, last_frame_number def _start_load_frame_number_range( self, instance_path: dicom_path.Path, number_of_frames: int, frame_number: int, max_request: int, ) -> None: """Call to start background thread loading DICOM instance into cache. Args: instance_path: DICOM web instance path. number_of_frames: Total number of frames in the instance. frame_number: Missing instance frame number which triggered load instance. max_request: Maximum number of frames to load. """ if number_of_frames <= 0: # if no frames do nothing. return if ( number_of_frames < self._max_instance_number_of_frames_to_prefer_whole_instance_download ): future = self._cache_whole_instance_in_memory( instance_path, number_of_frames, blocking=False ) if future is None: return frame_range_to_load = (1, number_of_frames) self._handle_future(instance_path, [frame_range_to_load], future) return frame_range_to_load = self._get_frame_range_to_load( instance_path, number_of_frames, frame_number, max_request, ) frame_range_to_load = self._clip_frame_range_to_loading_frames( instance_path, frame_range_to_load ) if frame_range_to_load is None: return ex = typing.cast(futures.ThreadPoolExecutor, self._orchestrator_thread_pool) self._handle_future( instance_path, [frame_range_to_load], ex.submit( self._load_frame_number_ranges_thread, instance_path, [frame_range_to_load], ), ) def _get_optmization_hint( self, number_of_frames: int, ) -> local_dicom_slide_cache_types.CacheConfigOptimizationHint: """Initializes hint based on class settings and number of frames.""" if ( number_of_frames == 1 and self._optimization_hint == local_dicom_slide_cache_types.CacheConfigOptimizationHint.MINIMIZE_LATENCY ): # Performance optimization. If number of frames in DICOM instance is 1 # then automatically wait for frame data to return. return ( local_dicom_slide_cache_types.CacheConfigOptimizationHint.MINIMIZE_DICOM_STORE_QPM ) return self._optimization_hint def _handle_cache_miss_minimize_dicom_store_qpm_optimization( self, instance_path: dicom_path.Path, frame_number: int, log_struct: Mapping[str, Any], ) -> Optional[bytes]: """Returns requested frame after waiting for futures to finish. Reduces store queries per mininute (QPM) by utilizing batch mechanism for filling cache and waiting for cache to fill before returning. Args: instance_path: DICOM web instance path. frame_number: Frame number requested. log_struct: Additional items to include in structured logs. Returns: Frame bytes or None if futures failed to load frame bytes. """ self.block_until_frames_are_loaded(instance_path) self._get_logger().debug( 'Reducing DICOM store queries by waiting for background thread frame' ' requests to complete.', log_struct, ) with self._lock: return self._get_frame_bytes(instance_path, frame_number) def _handle_cache_miss_minimize_latency_optimization( self, instance_path: dicom_path.Path, frame_number: int, log_struct: Mapping[str, Any], ) -> Optional[bytes]: """Retrieve and return requested frame indivually to reduce latency. Single frame requests from the store increase store queries per minute and are slow proportionally (base on bytes transferred / time). However these requests are still fast. Request the frame data and return to unblock other operations. Cache will likely be filled on subsequent read. The method log_cache_stats can be used to log and inspect cache behavior and perf. Args: instance_path: DICOM web instance path. frame_number: Frame number requested. log_struct: Additional items to include in structured logs. Returns: Frame bytes or None if futures failed to load frame bytes. """ self._get_logger().debug( 'Reducing latency by downloading DICOM frame immediately from the' ' DICOM store.', log_struct, ) start_time = time.time() try: dcm_frames = ( self._dicom_web_interface.download_instance_frames_untranscoded( instance_path, frame_number, frame_number, retry=False ) ) except ( ez_wsi_errors.HttpError, ez_wsi_errors.DownloadInstanceFrameError, ) as exp: self._get_logger().error( 'Exception occurred caching DICOM instance frames.', exp, log_struct, _log_elapsed_time(start_time), ) return None if len(dcm_frames) != 1: return None with self._lock: # Add retrieved frame to cache, to avoid re-retrieval if frame # requested again before background caching operation completes. self._add_cached_instance_frames(instance_path, frame_number, dcm_frames) self._cache_stats.number_of_frames_downloaded_to_reduce_latency += 1 self._cache_stats.time_spent_downloading_frames_to_reduce_latency += ( time.time() - start_time ) return dcm_frames[0] def get_cached_frame( self, instance_path: dicom_path.Path, frame_number: int, ) -> Optional[bytes]: """Returns instance frame bytes from cache or None if not found. Args: instance_path: DICOM web instance path. frame_number: Frame number with in DICOM instance, First frame = 1 Returns: Frames bytes or None """ if frame_number < 1: return None with self._lock: return self._get_frame_bytes(instance_path, frame_number) def get_frame( self, instance_path: dicom_path.Path, number_of_frames: int, frame_number: int, optimization_hint: Optional[ local_dicom_slide_cache_types.CacheConfigOptimizationHint ] = None, ) -> Optional[bytes]: """Returns frame bytes from DICOM instance or None if not found. If frame bytes not found and instance is in GCS cache, bytes for a frame block that includes the requested frame will be loaded on a background thread. Args: instance_path: DICOM web instance path. number_of_frames: Number of frames in DICOM instance. frame_number: Frame number with in DICOM instance, First frame = 1 optimization_hint: Optimization to control performance optimizations that occure on a cache miss. Returns: Frames bytes or None """ if frame_number < 1 or frame_number > number_of_frames: return None if optimization_hint is None: optimization_hint = self._get_optmization_hint(number_of_frames) with self._lock: if self._cache_stats.first_frame_number_read is None: self._cache_stats.first_frame_number_read = frame_number self._cache_stats.last_frame_number_read = frame_number else: self._cache_stats.first_frame_number_read = min( frame_number, self._cache_stats.first_frame_number_read ) self._cache_stats.last_frame_number_read = max( frame_number, self._cache_stats.last_frame_number_read ) frame_bytes = self._get_frame_bytes(instance_path, frame_number) if frame_bytes is not None: self._cache_stats.frame_cache_hit_count += 1 return frame_bytes log_struct = { _LogKeywords.DICOM_WEB_INSTANCE_PATH: instance_path, _LogKeywords.FRAME_NUMBER: frame_number, } if not self._is_frame_number_loading(instance_path, frame_number): self._start_load_frame_number_range( instance_path, number_of_frames, frame_number, self._number_of_frames_to_read, ) self._get_logger().debug( f'Cache Miss; Frame: {frame_number}; Starting frame loading', log_struct, ) else: self._get_logger().debug( f'Cache Miss; Frame: {frame_number}; Frame loading in progress', log_struct, ) self._cache_stats.frame_cache_miss_count += 1 if ( local_dicom_slide_cache_types.CacheConfigOptimizationHint.MINIMIZE_DICOM_STORE_QPM == optimization_hint ): return self._handle_cache_miss_minimize_dicom_store_qpm_optimization( instance_path, frame_number, log_struct ) elif ( local_dicom_slide_cache_types.CacheConfigOptimizationHint.MINIMIZE_LATENCY == optimization_hint ): return self._handle_cache_miss_minimize_latency_optimization( instance_path, frame_number, log_struct ) return None @property def cache_stats(self) -> local_dicom_slide_cache_types.CacheStats: """Returns cache stats metrics dataclass.""" with self._lock: cache_stats = copy.copy(self._cache_stats) if not self.lru_caching_enabled: cache_stats.frame_cache_memory_size_limit = None else: cache_tools_lru = typing.cast( cachetools.LRUCache, self._dicom_instance_frame_bytes ) cache_stats.frame_cache_memory_size_limit = cache_tools_lru.maxsize cache_stats.current_frame_Cache_memory_size = cache_tools_lru.currsize cache_stats.system_memory = psutil.virtual_memory() return cache_stats def reset_cache_stats(self) -> None: """Resets cache status metrics dataclass.""" with self._lock: self._cache_stats = local_dicom_slide_cache_types.CacheStats() with self._initialization_lock: # Force logger to re-initialize to update logger signatures self._logger = None