pathology/dicom_proxy/shared_test_util.py (185 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Shared test utilities."""
from __future__ import annotations
import contextlib
import copy
import http
import io
import json
import os
from typing import Any, Mapping, MutableMapping, Optional
from unittest import mock
import numpy as np
import PIL
import pydicom
from pathology.dicom_proxy import flask_util
from pathology.dicom_proxy import metadata_util
from pathology.dicom_proxy import parameters_exceptions_and_return_types
from pathology.dicom_proxy import pydicom_single_instance_read_cache
from pathology.dicom_proxy import render_frame_params
# Types
_DicomInstanceMetadata = metadata_util.DicomInstanceMetadata
_PyDicomSingleInstanceCache = (
pydicom_single_instance_read_cache.PyDicomSingleInstanceCache
)
_LocalDicomInstance = parameters_exceptions_and_return_types.LocalDicomInstance
_RenderFrameParams = render_frame_params.RenderFrameParams
def _http_status_response(status: http.HTTPStatus) -> str:
return (f'{status.value} {status.phrase}').upper()
def http_not_found_status() -> str:
return _http_status_response(http.HTTPStatus.NOT_FOUND)
def http_ok_status() -> str:
return _http_status_response(http.HTTPStatus.OK)
def http_bad_request_status() -> str:
return _http_status_response(http.HTTPStatus.BAD_REQUEST)
class MockFlaskRequest(contextlib.ExitStack):
"""Mock wrapper for flask request globals."""
def __init__(
self,
args: Optional[Mapping[str, str]] = None,
data: bytes = b'',
headers: Optional[Mapping[str, str]] = None,
method: str = 'GET',
path: str = '',
):
super().__init__()
self._args = copy.copy(args) if args is not None else {}
self._data = data
self._headers = copy.copy(headers) if headers is not None else {}
self._method = method
self._path = path
def __enter__(self) -> MockFlaskRequest:
result = super().__enter__()
result.enter_context(
mock.patch.object(
flask_util,
'get_headers',
autospec=True,
return_value=self._headers,
)
)
result.enter_context(
mock.patch.object(
flask_util, 'get_method', autospec=True, return_value=self._method
)
)
result.enter_context(
mock.patch.object(
flask_util,
'get_key_args_list',
autospec=True,
return_value=self._args,
)
)
result.enter_context(
mock.patch.object(
flask_util, 'get_data', autospec=True, return_value=self._data
)
)
result.enter_context(
mock.patch.object(
flask_util, 'get_path', autospec=True, return_value=self._path
)
)
return result
class RedisMock:
"""Redis cache mock."""
def __init__(self):
self._redis = {}
def clear(self):
self._redis.clear()
def get(self, key: str) -> Any:
return self._redis.get(key)
def set(
self, key: str, value: bytes, nx: bool, ex: Optional[int]
) -> Optional[str]:
del ex
if not nx:
self._redis[key] = value
return 'ok'
if key not in self._redis:
self._redis[key] = value
return 'ok'
return None
def delete(self, key: str) -> int:
if key in self._redis:
del self._redis[key]
return 1
return 0
def __len__(self) -> int:
return len(self._redis)
def _get_decoded_image_bytes(compressed_image_bytes: bytes) -> np.ndarray:
with io.BytesIO(compressed_image_bytes) as byte_buff:
return np.frombuffer(
PIL.Image.open(byte_buff).tobytes(), dtype=np.uint8
).flatten()
def rgb_image_almost_equal(
image_1: bytes, image_2: bytes, threshold: int = 3
) -> bool:
"""Test image RGB bytes values are close."""
return np.all(
np.abs(
_get_decoded_image_bytes(image_1) - _get_decoded_image_bytes(image_2)
)
< threshold
)
def get_dir_path(*args: str) -> str:
return os.path.join(os.path.dirname(__file__), *args)
def get_testdir_path(*args: str) -> str:
return get_dir_path('testdata', *args)
def jpeg_encoded_dicom_instance_test_path() -> str:
"""Returns path to small WSI DICOM instance."""
return get_testdir_path('multi_frame_jpeg_camelyon_challenge_image.dcm')
def jpeg_encoded_dicom_instance() -> pydicom.FileDataset:
"""Returns pydicom dataset to jpeg encoded WSI DICOM instance."""
return pydicom.dcmread(jpeg_encoded_dicom_instance_test_path())
def jpeg_encoded_dicom_instance_json() -> MutableMapping[str, Any]:
"""Returns dicom json for jpeg encoded WSI DICOM instance."""
with jpeg_encoded_dicom_instance() as ds:
file_meta = json.loads(ds.file_meta.to_json())
dicom_meta = json.loads(ds.to_json())
dicom_meta.update(file_meta)
return dicom_meta
def jpeg_encoded_dicom_instance_metadata(
changes: Optional[Mapping[str, Any]] = None
) -> _DicomInstanceMetadata:
"""Returns DICOM Proxy metadata for small WSI DICOM instance."""
md = metadata_util.get_instance_metadata_from_local_instance(
jpeg_encoded_dicom_instance_test_path()
)
if changes is None:
return md
return md.replace(changes)
def jpeg_encoded_pydicom_instance_cache(
metadata: Optional[Mapping[str, Any]] = None
) -> _PyDicomSingleInstanceCache:
"""Returns pydicom instance cache for small WSI DICOM instance."""
pydicom_instance = _PyDicomSingleInstanceCache(
pydicom_single_instance_read_cache.PyDicomFilePath(
jpeg_encoded_dicom_instance_test_path()
)
)
if metadata is not None:
pydicom_instance._metadata = pydicom_instance._metadata.replace(metadata) # pylint: disable=protected-access
return pydicom_instance
def jpeg_encoded_dicom_local_instance(
metadata: Optional[Mapping[str, Any]] = None
) -> _LocalDicomInstance:
return _LocalDicomInstance(
jpeg_encoded_pydicom_instance_cache(
metadata if metadata is not None else {}
)
)
def mock_multi_frame_test_metadata() -> _DicomInstanceMetadata:
"""Returns metadata which mocks multi-frame using very small instance.
Imaging data does not match mock.
"""
# Metadata describes a instance with single frame.
# Modifying metadat to make metadata reflect a tiled image (65 x 55 pixels)
# composed of frames which are 5 x 5 pixels. This metadata enables test
# cases to compute meaningfull DICOM frame cooordinate x pixel calculations.
return jpeg_encoded_dicom_instance_metadata(
dict(
rows=5,
columns=5,
total_pixel_matrix_columns=65,
total_pixel_matrix_rows=55,
)
)
def jpeg2000_dicom_cache() -> _PyDicomSingleInstanceCache:
path = get_testdir_path('multi_frame_jpeg2000_camelyon_challenge_image.dcm')
return _PyDicomSingleInstanceCache(
pydicom_single_instance_read_cache.PyDicomFilePath(path)
)
def jpeg2000_encoded_dicom_local_instance() -> _LocalDicomInstance:
return _LocalDicomInstance(jpeg2000_dicom_cache())
def wsi_dicom_annotation_path() -> str:
return get_testdir_path('wsi_annotation.dcm')
def wsi_dicom_annotation_instance() -> pydicom.FileDataset:
return pydicom.dcmread(wsi_dicom_annotation_path())
def mock_annotation_dicom_json_path() -> str:
return get_testdir_path('mock_annotation_dicom.json')
def mock_jpeg_dicom_json_path() -> str:
return get_testdir_path('mock_jpeg_dicom.json')