pathology/dicom_proxy/frame_caching_util.py (711 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Util for preemptive DICOM frame caching.
Pods should be set up in GKE to run with user affinity. Without this users won't
preferentially hit the same pod in GKE and the effect of caching will be
reduced. Without a cross pod shared redis memory store the effectiveness of
front-end queuing would be indeterminate. In the current design it is possible
even with GKE user affinity that the front-end conducts transactions across
multiple pods. In the current design each pod has its own cache, feasible with
C2N30 VM, given its relatively large amount of RAM (~120 GB) these machines
have. Each pod controls its own cache and pre-buffers itself by the traffic it
is experiencing.
"""
from __future__ import annotations
import concurrent.futures
import dataclasses
import functools
import itertools
import math
import multiprocessing
import os
import sys
import tempfile
import threading
import time
from typing import Iterator, List, MutableMapping, Optional, Sequence, Set, Tuple, Union
from absl import flags
import psutil
import pydicom
from pathology.dicom_proxy import dicom_instance_request
from pathology.dicom_proxy import dicom_proxy_flags
from pathology.dicom_proxy import dicom_url_util
from pathology.dicom_proxy import enum_types
from pathology.dicom_proxy import frame_retrieval_util
from pathology.dicom_proxy import metadata_util
from pathology.dicom_proxy import redis_cache
from pathology.dicom_proxy import render_frame_params
from pathology.dicom_proxy import user_auth_util
from pathology.shared_libs.logging_lib import cloud_logging_client
from pathology.shared_libs.pydicom_version_util import pydicom_version_util
# Types
_DicomInstanceRequest = dicom_instance_request.DicomInstanceRequest
@dataclasses.dataclass(frozen=True)
class _PremptiveCacheLoadingJob:
thread_or_process: Union[threading.Thread, multiprocessing.Process]
operation: Union[_CacheWholeInstance, _CacheInstanceFrameBlock]
_ACTIVE_CACHE_LOADING_CLEANUP_TIMEOUT = 60.0
_cache_instance_thread_lock = threading.Lock()
_active_cache_loading_instances: Set[_PremptiveCacheLoadingJob] = set()
_last_cleanup_preemptive_cache_loading = 0.0
def _init_fork_module_state() -> None:
global _cache_instance_thread_lock
global _active_cache_loading_instances
global _last_cleanup_preemptive_cache_loading
_cache_instance_thread_lock = threading.Lock()
_active_cache_loading_instances = set()
_last_cleanup_preemptive_cache_loading = 0.0
# Structured Log Keys
CACHE_WHOLE_INSTANCE = 'cache_whole_instance'
CPU_LOAD = 'cpu_load'
DICOM_INSTANCE = 'DICOM_SOPInstanceUID'
DICOM_TRANSFER_SYNTAX = 'transfer_syntax'
ELAPSED_TIME = 'elapsed_time(sec)'
NUMBER_OF_FRAMES_CACHED = 'number_of_frame_cached'
FIRST_FRAME = 'first_frame'
FRAMES_REQUESTED = 'DICOM_frame_numbers_requested'
LAST_FRAME = 'last_frame'
MAX_WHOLE_INSTANCE_CACHING_CPU_LOAD = 'MAX_WHOLE_INSTANCE_CACHING_CPU_LOAD'
NUMBER_OF_FRAMES_IN_DICOM_INSTANCE = 'number_of_frames_in_dicom_instance'
PREEMPTIVE_CACHE_FRAME_BLOCK_SIZE = 'PREEMPTIVE_CACHE_FRAME_BLOCK_SIZE'
PREEMPTIVE_INSTANCE_CACHE_MAX = 'PREEMPTIVE_INSTANCE_CACHE_MAX'
PREEMPTIVE_INSTANCE_CACHE_MAX_INSTANCE_FRAME_NUMBER = (
'PREEMPTIVE_INSTANCE_CACHE_MAX_INSTANCE_FRAME_NUMBER'
)
class UnexpectedDicomTransferSyntaxError(Exception):
pass
def get_cache_render_params(
dicom_sop_instance_url: dicom_url_util.DicomSopInstanceUrl,
metadata: metadata_util.DicomInstanceMetadata,
) -> render_frame_params.RenderFrameParams:
"""Returns rendered parameters to use in caching DICOM instance.
Args:
dicom_sop_instance_url: URL to DICOM instance being cached.
metadata: Metadata for DICOM instance being cached.
Returns:
Caching Render Frame Parameters.
Raises:
UnexpectedDicomTransferSyntaxError: Transfer syntax of DICOM instance is
not somthing caching module supports.
"""
render_params = render_frame_params.RenderFrameParams()
render_params.downsample = 1.0
if metadata.is_baseline_jpeg:
render_params.compression = enum_types.Compression.JPEG
elif metadata.is_jpeg2000:
render_params.compression = enum_types.Compression.JPEG2000
elif metadata.is_jpg_transcoded_to_jpegxl:
render_params.compression = enum_types.Compression.JPEG_TRANSCODED_TO_JPEGXL
elif metadata.is_jpegxl:
render_params.compression = enum_types.Compression.JPEGXL
else:
msg = 'Unexpected DICOM pixel encoding (Transfer Syntax).'
cloud_logging_client.error(
msg,
{
DICOM_INSTANCE: dicom_sop_instance_url,
DICOM_TRANSFER_SYNTAX: metadata.dicom_transfer_syntax,
},
)
raise UnexpectedDicomTransferSyntaxError(msg)
return render_params
def set_cached_frame(
redis: redis_cache.RedisCache,
uauth: user_auth_util.AuthSession,
dicom_instance_url: dicom_url_util.DicomSopInstanceUrl,
render_params: render_frame_params.RenderFrameParams,
ttl_sec: Optional[int],
allow_overwrite: bool,
frame_data: bytes,
frame_number: int,
) -> bool:
return redis.set(
frame_retrieval_util.frame_lru_cache_key(
dicom_url_util.download_dicom_raw_frame(
uauth, dicom_instance_url, [frame_number], render_params
)
),
frame_data,
allow_overwrite=allow_overwrite,
ttl_sec=ttl_sec,
)
def _set_cached_frame_from_gen(
redis: redis_cache.RedisCache,
uauth: user_auth_util.AuthSession,
dicom_instance_url: dicom_url_util.DicomSopInstanceUrl,
render_params: render_frame_params.RenderFrameParams,
ttl_sec: Optional[int],
frame_data_tpl: Tuple[bytes, int],
) -> bool:
return set_cached_frame(
redis,
uauth,
dicom_instance_url,
render_params,
ttl_sec,
True,
*frame_data_tpl,
)
def _get_encapsulated_data_frame(
encaps_data: Iterator[bytes],
) -> Iterator[Tuple[bytes, int]]:
for index, data in enumerate(encaps_data):
yield (data, index + 1)
def _get_pixel_data_frame(
pixel_data: bytes, number_of_frames: int
) -> Iterator[Tuple[bytes, int]]:
frame_size = int(len(pixel_data) / number_of_frames)
for index, offset in enumerate(range(0, len(pixel_data), frame_size)):
yield (pixel_data[offset : offset + frame_size], index + 1)
def _cache_entire_instance(dicom_instance: _DicomInstanceRequest) -> int:
"""Caches frames from entire DICOM instance.
Args:
dicom_instance: DICOM Store instance to downsample.
Returns:
Number of frames cached.
"""
with tempfile.TemporaryDirectory() as temp_dir:
path = os.path.join(temp_dir, 'instance.dcm')
dicom_instance.download_instance(path)
try:
render_params = get_cache_render_params(
dicom_instance.dicom_sop_instance_url, dicom_instance.metadata
)
except UnexpectedDicomTransferSyntaxError as exp:
cloud_logging_client.warning(
'Can not cache DICOM instance, instance encoded in unsupported'
' transfer syntax.',
{
'dicomWebInstance': dicom_instance.dicom_sop_instance_url,
'transfer_syntax_uid': (
dicom_instance.metadata.dicom_transfer_syntax
),
},
exp,
)
return 0
uauth = dicom_instance.user_auth
dicom_instance_url = dicom_instance.dicom_sop_instance_url
with pydicom.dcmread(path) as local_pydicom_instance:
number_of_frames = int(local_pydicom_instance.NumberOfFrames)
transfer_syntax_uid = str(
local_pydicom_instance.file_meta.TransferSyntaxUID
)
pixel_data = local_pydicom_instance.PixelData
redis = redis_cache.RedisCache()
frame_ttl = frame_retrieval_util.frame_cache_ttl()
set_frame_partial = functools.partial(
_set_cached_frame_from_gen,
redis,
uauth,
dicom_instance_url,
render_params,
frame_ttl,
)
if metadata_util.is_transfer_syntax_encapsulated(transfer_syntax_uid):
frame_data_generator = _get_encapsulated_data_frame(
pydicom_version_util.generate_frames(pixel_data, number_of_frames)
)
else:
frame_data_generator = _get_pixel_data_frame(pixel_data, number_of_frames)
# If running locally or processing small number of frames just iterate over
# frames and set cache from main in the thread.
if redis.is_localhost or number_of_frames < 100:
for frame_data in frame_data_generator:
set_frame_partial(frame_data)
else:
# If the number of frames is large create a thread pool and use pool to set
# Redis cache.
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as th_pool:
th_pool.map(set_frame_partial, frame_data_generator)
return number_of_frames
def _background_cache_key(dicom_instance: _DicomInstanceRequest) -> str:
return f'PreemptiveFrameCaching: {dicom_instance.dicom_sop_instance_url}'
class _CacheWholeInstance:
"""Downloads DICOM instance and loads frame cache in background."""
def __init__(
self,
dicom_instance: _DicomInstanceRequest,
running_in_process: bool,
load_display_thread: Optional[_CacheInstanceFrameBlock],
):
"""Constructor.
Args:
dicom_instance: DICOM instance to load into cache.
running_in_process: True if executed in process, False if executed in a
thread.
load_display_thread: Batch request to quickly load frames that likely
overlap in the display. Only pass if running in thread. If running in
process this parameter should be None.
"""
self._instance_request = dicom_instance
cloud_logging_client.info(
'Preemptive whole instance caching started.',
{
DICOM_INSTANCE: self._instance_request.dicom_sop_instance_url,
CACHE_WHOLE_INSTANCE: True,
},
)
self._running_in_process = running_in_process
self._load_display_thread = load_display_thread
def is_caching_whole_slide_instance(
self, check: _DicomInstanceRequest
) -> bool:
return str(self._instance_request.dicom_sop_instance_url) == str(
check.dicom_sop_instance_url
)
def _get_completion_msg_and_log(
self, start_time: float
) -> Tuple[str, MutableMapping[str, str]]:
"""Returns logging message and structured log."""
elapsed_time = time.time() - start_time
msg = (
'Preemptive whole instance caching stopped; elapsed_time:'
f' {elapsed_time:.3f}(sec)'
)
struct_log = {
DICOM_INSTANCE: str(self._instance_request.dicom_sop_instance_url),
ELAPSED_TIME: str(elapsed_time),
CACHE_WHOLE_INSTANCE: str(True),
NUMBER_OF_FRAMES_IN_DICOM_INSTANCE: str(
self._instance_request.metadata.number_of_frames
),
}
return (msg, struct_log)
def run(self) -> None:
"""Entry point for process that loads a DICOM instance into the cache."""
start_time = time.time()
if not self._running_in_process:
# runing in thread
if (
self._load_display_thread is not None
and self._load_display_thread.total_frames_in_block()
< self._instance_request.metadata.number_of_frames
):
# Only run the load display thread if it has less frames than the
# whole instance which will be loaded.
self._load_display_thread.run() # pytype: disable=attribute-error
else:
# running in process init flags.
flags.FLAGS(sys.argv, known_only=True)
# do not re-duplicate logging startup messaging.
cloud_logging_client.do_not_log_startup_msg()
instance_request = self._instance_request
try:
_cache_entire_instance(instance_request)
msg, struct_log = self._get_completion_msg_and_log(start_time)
number_of_frames = instance_request.metadata.number_of_frames
msg = f'{msg}; NumberOfFrames: {number_of_frames}.'
struct_log['frames_cached'] = number_of_frames
cloud_logging_client.info(msg, struct_log)
except Exception as exp:
redis = redis_cache.RedisCache()
redis.delete(_background_cache_key(instance_request))
msg, struct_log = self._get_completion_msg_and_log(start_time)
cloud_logging_client.error(msg, struct_log, exp)
raise
def _get_frame_request_region(
request_height_px: int,
request_width_px: int,
first_frame: int,
frames_per_row: int,
frames_per_column: int,
frame_row_dim: int,
frame_col_dim: int,
) -> Tuple[int, int, int, int]:
"""Returns inclusive frame coordinates (row, column) for first lst frame.
Args:
request_height_px: Hight in pixels to load.
request_width_px: Width in pixels to load.
first_frame: Frame number to base returned coordinates on.
frames_per_row: Number of frames per-row.
frames_per_column: Number of frames per-column.
frame_row_dim: Pixel dimensions of a frame height.
frame_col_dim: Pixel dimensions of frame width.
Returns:
First frame (row, col), Last Frame (row, col)
"""
f_row = int(first_frame / frames_per_row)
f_col = int(first_frame % frames_per_row)
request_rows = int(math.ceil(request_height_px / frame_row_dim))
request_columns = int(math.ceil(request_width_px / frame_col_dim))
monitor_resolution = int(
dicom_proxy_flags.PREEMPTIVE_DISPLAY_CACHE_PIXEL_DIM_FLG.value / 2
)
column_bias = int(math.ceil(monitor_resolution / frame_col_dim))
row_bias = int(math.ceil(monitor_resolution / frame_row_dim))
if request_rows > row_bias:
f_row = max(f_row - int((request_rows - row_bias) / 2), 0)
if request_columns > column_bias:
f_col = max(f_col - int((request_columns - column_bias) / 2), 0)
l_row = f_row + request_rows - 1
l_col = f_col + request_columns - 1
if l_col >= frames_per_row:
pad = l_col - (frames_per_row - 1)
f_col = max(0, f_col - pad)
l_col = frames_per_row - 1
if l_row >= frames_per_column:
pad = l_row - (frames_per_column - 1)
f_row = max(0, f_row - pad)
l_row = frames_per_column - 1
return f_row, f_col, l_row, l_col
class _CacheInstanceFrameBlock:
"""Downloads DICOM instance and loads frame cache in background."""
def __init__(
self,
dicom_instance: _DicomInstanceRequest,
requested_frames: List[int],
request_height: int,
request_width: int,
load_display_thread: Optional[_CacheInstanceFrameBlock] = None,
require_first_frame: bool = False,
):
super().__init__()
self._require_first_frame = require_first_frame
self._load_display_thread = load_display_thread
self._instance_request = dicom_instance
frames_per_row = int(
math.ceil(
dicom_instance.metadata.total_pixel_matrix_columns
/ dicom_instance.metadata.columns
)
)
frames_per_column = int(
math.ceil(
dicom_instance.metadata.total_pixel_matrix_rows
/ dicom_instance.metadata.rows
)
)
# Convert request in pixel dimensions to frames
first_frame = min(requested_frames) - 1
f_row, f_col, l_row, l_col = _get_frame_request_region(
request_height,
request_width,
first_frame,
frames_per_row,
frames_per_column,
dicom_instance.metadata.rows,
dicom_instance.metadata.columns,
)
self._first_frame_row = f_row
self._first_frame_col = f_col + 1 # Add 1 to make 1 first index
self._width_frame_count = l_col - f_col + 1
self._height_frame_count = l_row - f_row + 1
self._frames_per_row = frames_per_row
if (
self._load_display_thread is not None
and self.total_frames_in_block()
<= self._load_display_thread.total_frames_in_block()
):
# Only run the load display thread if it has less frames than the
# frame block which will be loaded.
self._load_display_thread = None
def total_frames_in_block(self) -> int:
return self._width_frame_count * self._height_frame_count # pytype: disable=attribute-error
def is_caching_whole_slide_instance(
self, unused_check: _DicomInstanceRequest
) -> bool:
return False
def get_start_end_frame_numbers_in_row(
self, frame_row_offset: int
) -> Optional[Tuple[int, int]]:
"""Return first and last frame numbers in row of frames.
Args:
frame_row_offset: Starting frame number offset for row of frames.
Returns:
None if no frames in row or Tuple[start, end (not inclusive)]
"""
if frame_row_offset < self._first_frame_row * self._frames_per_row:
return None
if (
frame_row_offset
>= (self._first_frame_row + self._height_frame_count)
* self._frames_per_row
):
return None
first_frame_number = max(self._first_frame_col + frame_row_offset, 1)
last_frame_number = min(
first_frame_number + self._width_frame_count,
self._instance_request.metadata.number_of_frames + 1,
)
return (first_frame_number, last_frame_number)
def _get_frame_number_range_in_row(
self, frame_row_offset: int
) -> Union[Sequence[int], Iterator[int]]:
"""Sequence of frame numbers in row, if defined clips inner regions.
Args:
frame_row_offset: Starting frame number offset for row of frames.
Returns:
Sequence of frame numbers in row, if defined excludes inner regions loaded
by load_display_thread_start_end_frames sub-thread.
"""
start_end_frame_numbers = self.get_start_end_frame_numbers_in_row(
frame_row_offset
)
if start_end_frame_numbers is None:
return range(1, -1)
if self._load_display_thread is not None:
load_display_thread_start_end_frames = (
self._load_display_thread.get_start_end_frame_numbers_in_row( # pytype: disable=attribute-error
frame_row_offset
)
)
if load_display_thread_start_end_frames is not None:
return itertools.chain(
range(
start_end_frame_numbers[0],
load_display_thread_start_end_frames[0],
),
range(
load_display_thread_start_end_frames[1],
start_end_frame_numbers[1],
),
)
return range(start_end_frame_numbers[0], start_end_frame_numbers[1])
def run(self) -> None:
"""Entry point for thread that loads a DICOM frames into the cache."""
if self._load_display_thread is not None:
self._load_display_thread.run() # pytype: disable=attribute-error
log_struct = {
DICOM_INSTANCE: self._instance_request.dicom_sop_instance_url,
CACHE_WHOLE_INSTANCE: False,
NUMBER_OF_FRAMES_IN_DICOM_INSTANCE: (
self._instance_request.metadata.number_of_frames
),
'frames_height': self._height_frame_count,
'frames_width': self._width_frame_count,
'frames_area': self._height_frame_count * self._width_frame_count,
}
try:
start_time = time.time()
instance_request = self._instance_request
uauth = instance_request.user_auth
try:
render_params = get_cache_render_params(
instance_request.dicom_sop_instance_url, instance_request.metadata
)
except UnexpectedDicomTransferSyntaxError as exp:
cloud_logging_client.warning(
'Can not cache DICOM instance, instance encoded in unsupported'
' transfer syntax.',
{
'dicomWebInstance': instance_request.dicom_sop_instance_url,
'transfer_syntax_uid': (
instance_request.metadata.dicom_transfer_syntax
),
},
exp,
)
return
redis = redis_cache.RedisCache()
# Clip range to edge of region not in block
frame_list = []
frame_counter = 0
for frame_row_index_offset in range(
self._first_frame_row * self._frames_per_row,
(self._first_frame_row + self._height_frame_count)
* self._frames_per_row,
self._frames_per_row,
):
for frame_number in self._get_frame_number_range_in_row(
frame_row_index_offset
):
frame_counter += 1
if not set_cached_frame(
redis,
uauth,
instance_request.dicom_sop_instance_url,
render_params,
5, # Tests if frame defined in cache. If not
False, # Try to set cache loading placeholder
frame_retrieval_util.CACHE_LOADING_FRAME_BYTES,
frame_number,
): # defined marks frame with temp place holder
if self._require_first_frame and frame_counter == 1:
return
continue # avoid but not prevent duplicate cache load.
frame_list.append(frame_number)
if len(frame_list) < 2:
return
log_struct[NUMBER_OF_FRAMES_CACHED] = len(frame_list)
instance_request.get_raw_dicom_frames(render_params, frame_list)
elapsed_time = time.time() - start_time
log_struct[ELAPSED_TIME] = elapsed_time
cloud_logging_client.info(
f'Done Frame Block Cache; Loaded: {len(frame_list)}(# of frames);'
f' Elapsed time: {elapsed_time:.3f}(sec)',
log_struct,
)
except Exception as exp:
cloud_logging_client.info(
'Unexpected exception occurred loading frame block cache.',
log_struct,
exp,
)
raise
def _remove_dead_cache_threads() -> None:
"""Removes no long running threads from processes thread set.
Not thread safe must be called within _cache_instance_thread_lock.
"""
for job in [
job
for job in _active_cache_loading_instances
if not job.thread_or_process.is_alive()
]:
_active_cache_loading_instances.remove(job)
def wait_for_cache_loading_threads() -> None:
"""Waits for all cache loading threads to complete."""
for job in _active_cache_loading_instances:
if job.thread_or_process.is_alive():
job.thread_or_process.join()
_remove_dead_cache_threads()
def _is_another_thread_caching_whole_slide_instance(
dicom_instance: _DicomInstanceRequest,
) -> bool:
"""Checks if instance is being processed by another process thread.
Not thread safe must be called within _cache_instance_thread_lock.
Highly unlikely, but possible redis cache could be lost due to LRU purge.
Redis set is primary protection and protects all processes on the machine.
This check is cheap and validates again that the process is not currently
caching the dicom instance in anothe thread.
Args:
dicom_instance: DICOM instance to check.
Returns:
True if instance is being ingested in another thread.
"""
for loading_cache in _active_cache_loading_instances:
if loading_cache.operation.is_caching_whole_slide_instance(dicom_instance):
return True
return False
def _create_threaded_cache_loader(
frame_loader: Union[_CacheWholeInstance, _CacheInstanceFrameBlock],
) -> threading.Thread:
return threading.Thread(target=frame_loader.run, daemon=True)
def _start_display_thread(
load_display_thread: Optional[_CacheInstanceFrameBlock],
) -> None:
if load_display_thread is not None:
_create_threaded_cache_loader(load_display_thread).start()
def cleanup_preemptive_cache_loading_set() -> None:
"""Cleansup completed preemptive cache loading jobs.
The purpose of this method is to enable the main loop to cleanup and log
messages returned from Processes loading whole DICOM instances into the cache.
"""
if not _cache_instance_thread_lock.acquire(blocking=False):
return
try:
if not _active_cache_loading_instances:
return
current_time = time.time()
global _last_cleanup_preemptive_cache_loading
if (
current_time - _last_cleanup_preemptive_cache_loading
> _ACTIVE_CACHE_LOADING_CLEANUP_TIMEOUT
):
_last_cleanup_preemptive_cache_loading = current_time
_remove_dead_cache_threads()
finally:
_cache_instance_thread_lock.release()
def _get_instance_cache_mp_class():
ctx = multiprocessing.get_context('spawn')
return ctx.Process
def _get_max_precent_memory_used_for_whole_slide_caching() -> float:
max_percent_memory = float(
dicom_proxy_flags.MAX_WHOLE_INSTANCE_CACHING_MEMORY_PRECENT_LOAD_FLG.value
)
return max(0.0, min(1.0, max_percent_memory / 100.0))
def _force_cache_whole_instance_in_memory(
dicom_instance: _DicomInstanceRequest,
frames_retrieved_in_block_request: int,
) -> bool:
# returns True if whole instance is small and should be cached at once using
# threaded in memory whole instance caching.
return (
dicom_instance.metadata.number_of_frames
<= frames_retrieved_in_block_request
)
def cache_instance_frames_in_background(
dicom_instance: _DicomInstanceRequest,
frame_indexes_requested: List[int],
) -> bool:
"""Launches thread to cache all instance frames.
Args:
dicom_instance: DICOM instance to cache.
frame_indexes_requested: Requested frames which triggered caching.
Returns:
True if background frame caching thread started started.
"""
if not frame_indexes_requested:
return False
if dicom_proxy_flags.DISABLE_PREEMPTIVE_INSTANCE_FRAME_CACHE_FLG.value:
return False
if not (
dicom_instance.metadata.is_baseline_jpeg
or dicom_instance.metadata.is_jpeg2000
or dicom_instance.metadata.is_jpegxl
or dicom_instance.metadata.is_jpg_transcoded_to_jpegxl
):
return False
if (
dicom_instance.metadata.number_of_frames
< dicom_proxy_flags.PREEMPTIVE_INSTANCE_CACHE_MIN_INSTANCE_FRAME_NUMBER_FLG.value
):
return False
cache_whole_instance = (
dicom_proxy_flags.ENABLE_PREEMPTIVE_WHOLEINSTANCE_CACHING_FLG.value
)
premptive_block_px_dim = (
dicom_proxy_flags.PREEMPTIVE_INSTANCE_CACHE_BLOCK_PIXEL_DIM_FLG.value
)
frames_retrieved_in_block_request = math.ceil(
premptive_block_px_dim / dicom_instance.metadata.rows
) * math.ceil(premptive_block_px_dim / dicom_instance.metadata.columns)
if (
cache_whole_instance
and dicom_instance.metadata.number_of_frames
> dicom_proxy_flags.PREEMPTIVE_INSTANCE_CACHE_MAX_INSTANCE_FRAME_NUMBER_FLG.value
):
# if caching whole instance and number of frames in instance exceeds
# a threshold then download frame batch.
cache_whole_instance = False
cloud_logging_client.debug(
'Performance optimization: Number of frames in DICOM instance is > '
' PREEMPTIVE_INSTANCE_CACHE_MAX_INSTANCE_FRAME_NUMBER; enabling frame'
' batch caching.',
{
DICOM_INSTANCE: dicom_instance.dicom_sop_instance_url,
NUMBER_OF_FRAMES_IN_DICOM_INSTANCE: (
dicom_instance.metadata.number_of_frames
),
PREEMPTIVE_INSTANCE_CACHE_MAX_INSTANCE_FRAME_NUMBER: (
dicom_proxy_flags.PREEMPTIVE_INSTANCE_CACHE_MAX_INSTANCE_FRAME_NUMBER_FLG.value
),
},
)
# If size of the instance equal or smaller than batch request.
# Use threaded whole instance retrieval. Greatest performance for small
# instances.
force_cache_whole_instance_in_memory = _force_cache_whole_instance_in_memory(
dicom_instance, frames_retrieved_in_block_request
)
if cache_whole_instance and not force_cache_whole_instance_in_memory:
# If caching whole instance and CPU load on DICOM Proxy is high then cache
# frame block to ensure a faster response when instances with many frames
# are being cached.
max_cpu_load = max(
0,
min(
dicom_proxy_flags.MAX_WHOLE_INSTANCE_CACHING_CPU_LOAD_FLG.value,
100,
),
)
if max_cpu_load < 100:
try:
cpu_count = float(psutil.cpu_count())
cpu_load = [load / cpu_count * 100 for load in psutil.getloadavg()[:2]]
if max(cpu_load) > max_cpu_load:
cache_whole_instance = False
cloud_logging_client.debug(
'Performance optimization: CPU load exceeds '
' MAX_WHOLE_INSTANCE_CACHING_CPU_LOAD; enabling frame batch'
' caching.',
{
DICOM_INSTANCE: dicom_instance.dicom_sop_instance_url,
CPU_LOAD: cpu_load,
MAX_WHOLE_INSTANCE_CACHING_CPU_LOAD: (
dicom_proxy_flags.MAX_WHOLE_INSTANCE_CACHING_CPU_LOAD_FLG.value
),
},
)
else:
percent_memory_used = psutil.Process().memory_percent()
max_percent_memory = (
_get_max_precent_memory_used_for_whole_slide_caching()
)
if percent_memory_used > max_percent_memory:
cache_whole_instance = False
precent_threshold = int(100.0 * max_percent_memory)
cloud_logging_client.debug(
f'Memory utilization exceeds {precent_threshold}%; enabling'
' frame batch caching.',
{'percent_system_memory_used': percent_memory_used},
)
except OSError:
pass
if (
dicom_instance.metadata.number_of_frames
< dicom_proxy_flags.PREEMPTIVE_DISPLAY_CACHE_MIN_INSTANCE_FRAME_NUMBER_FLG.value
):
load_display_thread = None
else:
load_display_thread = _CacheInstanceFrameBlock(
dicom_instance,
frame_indexes_requested,
dicom_proxy_flags.PREEMPTIVE_DISPLAY_CACHE_PIXEL_DIM_FLG.value,
dicom_proxy_flags.PREEMPTIVE_DISPLAY_CACHE_PIXEL_DIM_FLG.value,
require_first_frame=True,
)
redis = redis_cache.RedisCache()
redis_cache_key = _background_cache_key(dicom_instance)
# Try to set key value pair. Only succeeds if key value pair doesn't exist to
# avoid re-caching the same instance if under high usage.
# Key is deleted on error or after timeout or on compeltion for batch frame
# caching.
if not redis.set(
redis_cache_key,
'Running',
allow_overwrite=False,
ttl_sec=dicom_proxy_flags.PREEMPTIVE_WHOLEINSTANCE_RECACHING_TTL_FLG.value
if cache_whole_instance or force_cache_whole_instance_in_memory
else 3,
):
_start_display_thread(load_display_thread)
return False
with _cache_instance_thread_lock:
try:
_remove_dead_cache_threads()
if _is_another_thread_caching_whole_slide_instance(dicom_instance):
# Test is a bit redundant with redis check.
# Validates that whole instance is not being cached on currently.
# Possible redis check was removed from cache as a result of timeout
# or pseudo LRU ejection.
# Keep redis_cache_key lock in place.
_start_display_thread(load_display_thread)
return False
if (
len(_active_cache_loading_instances)
>= dicom_proxy_flags.PREEMPTIVE_INSTANCE_CACHE_MAX_THREADS_FLG.value
):
cloud_logging_client.info(
(
'Concurrent DICOM instance read threshold hit; preemptive'
' caching not started for instance.'
),
{
DICOM_INSTANCE: dicom_instance.dicom_sop_instance_url,
PREEMPTIVE_INSTANCE_CACHE_MAX: (
dicom_proxy_flags.PREEMPTIVE_INSTANCE_CACHE_MAX_THREADS_FLG.value
),
},
)
# delete redis_cache_key to enable instance caching to be retried
# without delay.
redis.delete(redis_cache_key)
_start_display_thread(load_display_thread)
return False
if force_cache_whole_instance_in_memory:
# If size of the instance equal or smaller than batch request.
# Use threaded whole instance retrieval. Greatest Performance.
ch_instance = _CacheWholeInstance(
dicom_instance,
running_in_process=False,
load_display_thread=load_display_thread,
)
cache_instance_thread = _create_threaded_cache_loader(ch_instance)
elif cache_whole_instance:
ch_instance = _CacheWholeInstance(
dicom_instance, running_in_process=True, load_display_thread=None
)
cache_mp_obj = _get_instance_cache_mp_class()
cache_instance_thread = cache_mp_obj(
target=ch_instance.run, daemon=True
)
_start_display_thread(load_display_thread)
else:
ch_instance = _CacheInstanceFrameBlock(
dicom_instance,
frame_indexes_requested,
dicom_proxy_flags.PREEMPTIVE_INSTANCE_CACHE_BLOCK_PIXEL_DIM_FLG.value,
dicom_proxy_flags.PREEMPTIVE_INSTANCE_CACHE_BLOCK_PIXEL_DIM_FLG.value,
load_display_thread=load_display_thread,
)
cache_instance_thread = _create_threaded_cache_loader(ch_instance)
cache_instance_thread.start()
_active_cache_loading_instances.add(
_PremptiveCacheLoadingJob(cache_instance_thread, ch_instance)
)
return True
except Exception as exp:
redis.delete(redis_cache_key) # Unblock instance and frame block cache.
cloud_logging_client.error(
'Unexpected error.',
{
DICOM_INSTANCE: dicom_instance.dicom_sop_instance_url,
CACHE_WHOLE_INSTANCE: cache_whole_instance,
FRAMES_REQUESTED: frame_indexes_requested,
},
exp,
)
raise
finally:
if not cache_whole_instance and not force_cache_whole_instance_in_memory:
redis.delete(redis_cache_key) # Unblock caching another frame block.
os.register_at_fork(after_in_child=_init_fork_module_state)