ez_wsi_dicomweb/dicom_slide.py (1,680 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""An interface for accessing a Pathology slide stored in a DICOMStore.
Whole-Slide Images (WSI) (https://dicom.nema.org/dicom/dicomwsi) in digital
pathology are scans of glass slides typically created at a magnification of
20x or 40x using microscopes. Pixels typically represent between
0.275 µm - 0.5 µm of a slide but the exact size depends on the scanner camera
resolution. With typical dimensions between of 15mm x 15mm for tissue slides,
WSIs are often about 50,000 x 50,000 pixels for a 20x magnification or
100,000 x 100,000 pixels for a 40x magnification. The images are usually stored
as image pyramid to allow for fast access to different magnification levels.
The class layout/hierarchy is as follows:
DicomSlide: a single (whole) slide image typically represented as
an image pyramid along with slide level metadata.
|--> DicomImage: an image at a specific magnification in the image pyramid of
a slide along with associated metadata.
|--> DicomImagePatch: a rectangular patch/tile of an Image
"""
from __future__ import annotations
import abc
from collections.abc import Sequence
import copy
import dataclasses
import heapq
import importlib.resources
import io
import itertools
import json
import logging
import math
from typing import Any, Collection, Dict, Iterator, List, Mapping, Optional, Set, Tuple, Union
import cv2
from ez_wsi_dicomweb import credential_factory
from ez_wsi_dicomweb import dicom_frame_decoder
from ez_wsi_dicomweb import dicom_web_interface
from ez_wsi_dicomweb import ez_wsi_errors
from ez_wsi_dicomweb import ez_wsi_logging_factory
from ez_wsi_dicomweb import local_dicom_slide_cache
from ez_wsi_dicomweb import local_dicom_slide_cache_types
from ez_wsi_dicomweb import pixel_spacing as pixel_spacing_module
from ez_wsi_dicomweb import slide_level_map
from ez_wsi_dicomweb.ml_toolkit import dicom_path
from ez_wsi_dicomweb.ml_toolkit import tags
import google.auth
import numpy as np
import PIL
from PIL import ImageCms
import pydicom
# JSON slide metadata keys
_SOP_CLASS_UID = 'sop_class_uid'
LEVEL_MAP = 'level_map'
_SLIDE_PATH = 'slide_path'
UNTILED_MICROSCOPE_IMAGE_MAP = 'untiled_microscope_image_map'
# ICC Profile Color Correction
_RAW = 'raw'
_RGB = 'RGB'
_SRGB = 'sRGB'
# Annotation IOD
# https://dicom.nema.org/medical/dicom/current/output/chtml/part04/sect_b.5.html
MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_STORAGE = '1.2.840.10008.5.1.4.1.1.91.1'
ImageDimensions = slide_level_map.ImageDimensions
Level = slide_level_map.Level
ResizedLevel = slide_level_map.ResizedLevel
def _read_icc_profile(dir_name: str, filename: str) -> bytes:
# https://setuptools.pypa.io/en/latest/userguide/datafiles.html
rc_file = importlib.resources.files('third_party')
return rc_file.joinpath(dir_name, filename).read_bytes()
def get_srgb_icc_profile_bytes() -> bytes:
"""Returns sRGB ICC Profile bytes."""
try:
return _read_icc_profile('srgb', 'sRGB_v4_ICC_preference.icc')
except FileNotFoundError:
return ImageCms.ImageCmsProfile(ImageCms.createProfile(_SRGB)).tobytes()
def get_adobergb_icc_profile_bytes() -> bytes:
"""Returns AdobeRGB ICC Profile bytes."""
return _read_icc_profile('adobergb1998', 'AdobeRGB1998.icc')
def get_rommrgb_icc_profile_bytes() -> bytes:
"""Returns ROMM RGB ICC Profile bytes."""
return _read_icc_profile('rommrgb', 'ISO22028-2_ROMM-RGB.icc')
def _get_cmsprofile_from_iccprofile_bytes(b: bytes) -> ImageCms.ImageCmsProfile:
"""Converts ICC Profile bytes to ImageCms.ImageCmsProfile."""
return ImageCms.getOpenProfile(io.BytesIO(b))
def get_srgb_icc_profile() -> ImageCms.core.CmsProfile:
"""Returns sRGB ICC Profile."""
return _get_cmsprofile_from_iccprofile_bytes(get_srgb_icc_profile_bytes())
def get_adobergb_icc_profile() -> ImageCms.core.CmsProfile:
"""Returns AdobeRGB ICC Profile."""
return _get_cmsprofile_from_iccprofile_bytes(get_adobergb_icc_profile_bytes())
def get_rommrgb_icc_profile() -> ImageCms.core.CmsProfile:
"""Returns ROMMRGB ICC Profile."""
return _get_cmsprofile_from_iccprofile_bytes(get_rommrgb_icc_profile_bytes())
@dataclasses.dataclass(frozen=True)
class _PatchIntersection:
x_origin: int # The upper leftmost x coordinate of the patch intersection.
y_origin: int # The upper leftmost y coordinate of the patch intersection.
width: int
height: int
@dataclasses.dataclass(frozen=True)
class Frame:
"""A frame in a DICOM, in pixel unit."""
x_origin: int # The upper leftmost x coordinate of the patch intersection.
y_origin: int # The upper leftmost y coordinate of the patch intersection.
width: int
height: int
image_np: np.ndarray
# DICOM Transfer Syntax's which do not require transcoding and are natively
# compatible.
_SUPPORTED_CLIENT_SIDE_DECODING_RAW_TRANSFER_SYNTAXS = (
'1.2.840.10008.1.2.1',
'1.2.840.10008.1.2',
'1.2.840.10008.1.2.1.99', # Deflated Explicit VR Little Endian
)
def is_client_side_pixel_decoding_supported(transfer_syntax: str) -> bool:
"""Returns True if cache supports client side decoding of pixel encoding.
Args:
transfer_syntax: DICOM transfer syntax uid.
Returns:
True if cache supports operation on instances with encoding.
"""
return (
dicom_frame_decoder.can_decompress_dicom_transfer_syntax(transfer_syntax)
or transfer_syntax in _SUPPORTED_CLIENT_SIDE_DECODING_RAW_TRANSFER_SYNTAXS
)
def _pixel_spacing_to_level(
slide: DicomSlide,
pixel_spacing: pixel_spacing_module.PixelSpacing,
maximum_downsample: float = 0.0,
) -> slide_level_map.Level:
"""Returns the level that has the lowest level index."""
logging.info(
'Pixel spacing parameter will be deprecated in future versions. Modify'
' code to define source imaging using slide_level_map.Level.'
)
source_image_level = slide.get_level_by_pixel_spacing(
pixel_spacing, maximum_downsample=maximum_downsample
)
if source_image_level is None:
raise ez_wsi_errors.PixelSpacingLevelNotFoundError(
'No pyramid level found with pixel spacing:'
f' ~{pixel_spacing.pixel_spacing_mm} mm/px'
)
return source_image_level
class DicomImage:
"""Represents an image at a specific pixel spacing in a DicomSlide."""
def __init__(
self,
level: Union[
slide_level_map.Level,
slide_level_map.ResizedLevel,
],
source: _DicomSeries,
):
"""Constructor for DicomImage.
Args:
level: Pyramid level to return.
source: The series this image belongs to.
"""
self._output_image_level = level
self._source = source
@property
def pixel_spacing(self) -> pixel_spacing_module.PixelSpacing:
return self._output_image_level.pixel_spacing
@property
def source(self) -> _DicomSeries:
return self._source
def get_image_as_patch(
self, require_fully_in_source_image: bool = False
) -> DicomPatch:
return self._source.get_patch(
self._output_image_level,
x=0,
y=0,
width=self._output_image_level.width,
height=self._output_image_level.height,
require_fully_in_source_image=require_fully_in_source_image,
)
@property
def width(self) -> int:
return self._output_image_level.width
@property
def height(self) -> int:
return self._output_image_level.height
def image_bytes(
self, color_transform: Optional[ImageCms.ImageCmsTransform] = None
) -> np.ndarray:
"""Loads the pixel bytes of the DICOM Image.
Args:
color_transform: Optional ICC Profile color transformation to perform on
image.
Returns:
Numpy array representing the DICOM Image.
"""
# Internally reuses the Patch implementation for bytes fetching.
# An image can be represented as a giant patch starting from (0, 0)
# and spans the whole slide.
return self.get_image_as_patch().image_bytes(
color_transform=color_transform
)
@dataclasses.dataclass(frozen=True)
class PatchBounds:
"""A bounding rectangle of a patch, in pixel units."""
x_origin: int # The upper leftmost x coordinate of the patch intersection.
y_origin: int # The upper leftmost y coordinate of the patch intersection.
width: int
height: int
class BasePatch:
"""A rectangular patch/tile/view of an Image at a specific pixel spacing."""
def __init__(
self,
x: int,
y: int,
width: int,
height: int,
):
self._x = x
self._y = y
self._width = width
self._height = height
@property
def x(self) -> int:
return self._x
@property
def y(self) -> int:
return self._y
@property
def width(self) -> int:
return self._width
@property
def height(self) -> int:
return self._height
@property
def patch_bounds(self) -> PatchBounds:
return PatchBounds(
x_origin=self.x, y_origin=self.y, width=self.width, height=self.height
)
def is_patch_fully_in_source_image_dim(self, width: int, height: int) -> bool:
if self.x < 0 or self.y < 0 or self.width <= 0 or self.height <= 0:
return False
return self.x + self.width <= width and self.y + self.height <= height
def get_image_bytes_samples_per_pixel(image_bytes: np.ndarray) -> int:
"""Returns the number of samples per pixel in the image.
Args:
image_bytes: Uncompressed image bytes (e.g., 8 bit RGB)
Raises:
ez_wsi_errors.GcsImageError: If the image is not 2D or 3D.
"""
if len(image_bytes.shape) == 2:
return 1
elif len(image_bytes.shape) == 3:
return image_bytes.shape[2]
raise ez_wsi_errors.GcsImageError(
f'Invalid image shape: {image_bytes.shape}. Image must be 2D or 3D.'
)
def transform_image_bytes_color(
image_bytes: np.ndarray,
color_transform: Optional[ImageCms.ImageCmsTransform] = None,
) -> np.ndarray:
"""Transforms image bytes color using ICC Profile Transformation."""
samples_per_pixel = get_image_bytes_samples_per_pixel(image_bytes)
height, width = image_bytes.shape[0:2]
if color_transform is None or samples_per_pixel <= 1:
return image_bytes
img = PIL.Image.frombuffer(
_RGB,
(width, height),
image_bytes.tobytes(),
decoder_name=_RAW,
)
ImageCms.applyTransform(img, color_transform, inPlace=True)
return np.asarray(img)
class _SlidePyramidLevelPatch(BasePatch):
"""A rectangular patch/tile/view of an Image at a specific pixel spacing.
A Patch's data is composed from its overlap with one or more DICOM Frames.
Patch representation here represents a region of pixels as stored in DICOM.
"""
def __init__(
self,
source: _DicomSeries,
level: slide_level_map.Level,
x: int,
y: int,
width: int,
height: int,
):
"""Constructor.
Args:
source: The source DICOM series this patch belongs to.
level: Pyramid Level source imaging is derived from.
x: The X coordinate of the starting point (upper-left corner) of the
patch.
y: The Y coordinate of the starting point (upper-left corner) of the
patch.
width: The width of the patch.
height: The height of the patch.
"""
super().__init__(x, y, width, height)
self._level = level
self._source = source
if not source.has_level(level):
raise ez_wsi_errors.LevelNotFoundError(
f'The level {level.level_index} is not found in the slide.'
)
def is_patch_fully_in_source_image(self) -> bool:
return self.is_patch_fully_in_source_image_dim(
self._level.width, self._level.height
)
def get_pyramid_imaging_source_level(self) -> slide_level_map.Level:
return self._level
@property
def pixel_spacing(self) -> pixel_spacing_module.PixelSpacing:
return self._level.pixel_spacing
def __eq__(self, other: Any) -> bool:
if not isinstance(other, _SlidePyramidLevelPatch):
return False
return (
self.x == other.x
and self.y == other.y
and self.width == other.width
and self.height == other.height
and (self._source is other._source or self._source == other._source)
and (self._level is other._level or self._level == other._level)
)
@property
def source(self) -> _DicomSeries:
return self._source
def _get_intersection(
self,
frame_x_cord: int,
frame_y_cord: int,
frame_width: int,
frame_height: int,
) -> _PatchIntersection:
"""Returns intersection from the source patch and rectangular coordinate.
Args:
frame_x_cord: Source frame x coordinate (upper left).
frame_y_cord: Source frame y coordinate (upper left).
frame_width: Source frame width
frame_height: Source frame height
Returns:
Intersection between patches and rectangular coordinate.
Raises:
PatchIntersectionNotFoundError if there is no overlap between the source
and the destination patches.
"""
x = max(frame_x_cord, self.x)
y = max(frame_y_cord, self.y)
width = min(frame_x_cord + frame_width - x, self.x + self.width - x)
height = min(frame_y_cord + frame_height - y, self.y + self.height - y)
if width <= 0 or height <= 0:
raise ez_wsi_errors.PatchIntersectionNotFoundError(
'There is no overlap region between the source and the destination'
' patches.'
)
return _PatchIntersection(
x_origin=x, y_origin=y, width=width, height=height
)
def _copy_overlapped_region(
self, src_frame: Frame, dst_image_np: np.ndarray
) -> Tuple[int, int]:
"""Copies the overlapped region from the source patch to the dest nd.array.
Args:
src_frame: the source frame to be copied from.
dst_image_np: the destination np to be copied to.
Returns:
The width and height of the overlapped region gets copied.
Raises:
PatchIntersectionNotFoundError if there is no overlap between the source
and the destination patches.
"""
intersection = self._get_intersection(
src_frame.x_origin,
src_frame.y_origin,
src_frame.width,
src_frame.height,
)
x = intersection.x_origin
y = intersection.y_origin
width = intersection.width
height = intersection.height
# pylint: disable=protected-access
_copy_ndarray(
src_frame.image_np,
x - src_frame.x_origin,
y - src_frame.y_origin,
width,
height,
dst_image_np,
x - self.x,
y - self.y,
)
# pylint: enable=protected-access
return width, height
def frame_numbers(self) -> Iterator[int]:
"""Generates slide level frame numbers required to render patch.
Frame numbering starts at 1.
Yields:
A generator that produces frame numbers.
Raises:
DicomSlideMissingError if slide used to create self is None.
"""
if self.source is None:
raise ez_wsi_errors.DicomSlideMissingError(
'Unable to get image pixels. Parent slide is None.'
)
y = self.y
x = self.x
width = self.width
height = self.height
cy = y
frame_width = self._level.frame_width
frame_height = self._level.frame_height
cached_instance = None
last_instance_frame_number = -math.inf
while cy < y + height and cy < self._level.height:
cx = x
region_height = 0
while cx < x + width and cx < self._level.width:
try:
frame_number = self._level.get_frame_number_by_point(cx, cy)
if (
cached_instance is None
or frame_number - cached_instance.frame_offset
>= cached_instance.frame_count
):
cached_instance = self._level.get_instance_by_frame(frame_number)
last_instance_frame_number = -math.inf
if cached_instance is None:
raise ez_wsi_errors.FrameNumberOutofBoundsError()
if frame_number > last_instance_frame_number:
yield frame_number
last_instance_frame_number = frame_number
pos_x, pos_y = self._level.get_frame_position(frame_number)
intersection = self._get_intersection(
pos_x, pos_y, frame_width, frame_height
)
region_width = intersection.width
region_height = intersection.height
except ez_wsi_errors.EZWsiError:
# No frame found at (cx, cy), move 1 pixel in both X, and Y direction.
region_width = 1
region_height = max(1, region_height)
cx += region_width
cy += region_height
def image_bytes(
self, color_transform: Optional[ImageCms.ImageCmsTransform] = None
) -> np.ndarray:
"""Returns the patch's image bytes.
Args:
color_transform: Optional ICC Profile color transformation to apply to on
image bytes.
Returns:
Numpy array type image.
Raises:
DicomSlideMissingError if slide used to create self is None.
PatchOutOfBoundsError if the patch is not within the bounds of the DICOM
image.
"""
if self._source is None:
raise ez_wsi_errors.DicomSlideMissingError(
'Unable to get image pixels. Parent slide is None.'
)
image_bytes = np.zeros(
(self.height, self.width, self._level.samples_per_pixel),
self._level.pixel_format,
)
pixel_copied = False
# Copies image pixels from all overlapped frames.
for frame_number in self.frame_numbers():
frame = self.source.get_frame(self._level, frame_number)
if frame is None:
continue
try:
self._copy_overlapped_region(frame, image_bytes)
pixel_copied = True
except ez_wsi_errors.EZWsiError:
continue
if not pixel_copied:
raise ez_wsi_errors.SectionOutOfImageBoundsError(
'The requested patch is out of scope of the image.'
)
return transform_image_bytes_color(image_bytes, color_transform)
def get_gcp_data_credential_header(
self, credential: Optional[google.auth.credentials.Credentials] = None
) -> Dict[str, str]:
"""Returns the credential header patch requests."""
return self.source.get_credential_header(credential)
def _scale_coordinate(pt: int, source_width: int, dest_width: int) -> int:
"""Scale coordinate from source to destination."""
return int(int(pt) * int(dest_width) / int(source_width))
def _bottom_offset_padding(pt: int, source_width: int, dest_width: int) -> int:
"""Returns starting offset when upsampling.
Returns 0 if downsampling. Otherwise determines the fraction of the first
upsampled pixel which falls within the upsampled image.
Args:
pt: Coordinate of pixel in source imaging.
source_width: Width of source imaging.
dest_width: Width of destination imaging.
Returns:
0 if imaging is being downsampled or clipping pixel offset for first
upsampled pixel.
"""
if source_width >= dest_width:
return 0
sf = int(source_width) / int(dest_width)
fractional_pt = float(int(pt) * int(source_width) / int(dest_width))
padding = int((fractional_pt - math.floor(fractional_pt)) / sf)
return max(padding, 0)
def _top_offset_padding(pt: int, source_width: int, dest_width: int) -> bool:
"""Returns true if last upsampled pixel partially falls on image.
Returns false if downsampling. Otherwise determines i the last
upsampled pixel which falls only partially within the upsampled image bounds.
Args:
pt: Coordinate of pixel in source imaging.
source_width: Width of source imaging.
dest_width: Width of destination imaging.
Returns:
False if imaging is being downsampled or True if last pixel only partially
falls within image patch.
"""
if source_width >= dest_width:
return False
sf = int(source_width) / int(dest_width)
fractional_pt = float(int(pt) * int(source_width) / int(dest_width))
padding = int((math.ceil(fractional_pt) - fractional_pt) / sf)
return max(padding, 0) > 0
class DicomPatch(_SlidePyramidLevelPatch):
"""A rectangular patch/tile/view of an Image at a specific pixel spacing.
Abstraction over, _SlidePyramidLevelPatch that supports resizing source
pyramid imaging to target pixel spacing.
"""
def __init__(
self,
source_image_level: slide_level_map.Level,
x: int,
y: int,
width: int,
height: int,
source: _DicomSeries,
destination_image_level: Union[
slide_level_map.Level, slide_level_map.ResizedLevel, None
] = None,
require_fully_in_source_image: bool = False,
):
"""Constructor.
Args:
source_image_level: Level that slide source imaging is generated from.
x: The X coordinate of the starting point (upper-left corner) of the
generated patch in destination image level coordinates.
y: The Y coordinate of the starting point (upper-left corner) of the
generated patch in destination image level coordinates.
width: The width of the generated patch in destination image.
height: The height of the generated patch in destination image.
source: The parent DICOM Slide this patch belongs to.
destination_image_level: Level that patch represents if undefined defaults
to source level.
require_fully_in_source_image: Require patch be fully in image
"""
if (
not source_image_level.tiled_full
and source_image_level.number_of_frames != 1
):
raise ez_wsi_errors.DicomPatchGenerationError(
'DICOM instance(s) do not have TILED_FULL Dimension Organization'
' Type.'
)
super().__init__(source, source_image_level, x, y, width, height)
if destination_image_level is None:
destination_image_level = source_image_level
self._destination_image_level = destination_image_level
source_start_x = _scale_coordinate(
x, destination_image_level.width, source_image_level.width
)
source_start_y = _scale_coordinate(
y, destination_image_level.height, source_image_level.height
)
source_end_x = _scale_coordinate(
x + width, destination_image_level.width, source_image_level.width
)
source_end_y = _scale_coordinate(
y + height, destination_image_level.height, source_image_level.height
)
if (
source_start_x == x
and source_start_y == y
and source_end_x == x + width
and source_end_y == y + height
):
# patch imaging is not being resized.
self._resized_patch_source = None
self._pixel_spacing = source_image_level.pixel_spacing
self._bottom_x_offset_pad = 0 # Not used in if dimensions unchanged.
self._bottom_y_offset_pad = 0 # Not used
self._upsample_target_width = width # Not used
self._upsample_target_height = height # Not used
else:
# patch imaging is not being resized.
# determine offsets for image upsampling. NOPs for downsampling.
self._bottom_x_offset_pad = _bottom_offset_padding(
x, source_image_level.width, destination_image_level.width
)
self._bottom_y_offset_pad = _bottom_offset_padding(
y, source_image_level.height, destination_image_level.height
)
# if patch bounds end before end of image and there is an additional
# pixel in source which fractionally falls on the patch, then include.
if source_end_x < source_image_level.width and _top_offset_padding(
x + width, source_image_level.width, destination_image_level.width
):
source_end_x += 1
# if patch bounds end before end of image and there is an additional
# pixel in source which fractionally falls on the patch, then include.
if source_end_y < source_image_level.height and _top_offset_padding(
y + height,
source_image_level.height,
destination_image_level.height,
):
source_end_y += 1
self._pixel_spacing = destination_image_level.pixel_spacing
source_width = source_end_x - source_start_x
source_height = source_end_y - source_start_y
self._resized_patch_source = _SlidePyramidLevelPatch(
source,
source_image_level,
source_start_x,
source_start_y,
source_width,
source_height,
)
# determine dimensions to rescale upsampled image to before clipping
self._upsample_target_width = _scale_coordinate(
source_start_x + source_width,
source_image_level.width,
destination_image_level.width,
) - _scale_coordinate(
source_start_x,
source_image_level.width,
destination_image_level.width,
)
self._upsample_target_height = _scale_coordinate(
source_start_y + source_height,
source_image_level.height,
destination_image_level.height,
) - _scale_coordinate(
source_start_y,
source_image_level.height,
destination_image_level.height,
)
if (
require_fully_in_source_image
and not self.is_patch_fully_in_source_image()
):
raise ez_wsi_errors.PatchOutsideOfImageDimensionsError(
'A portion of the patch does not overlap the image.'
)
self._require_fully_in_source_image = require_fully_in_source_image
def __eq__(self, other: Any) -> bool:
if not isinstance(other, DicomPatch):
return False
return (
self.x == other.x
and self.y == other.y
and self.width == other.width
and self.height == other.height
and (self.source is other.source or self.source == other.source)
and (self._level is other._level or self._level == other._level)
and (
self._destination_image_level is other._destination_image_level
or self._destination_image_level == other._destination_image_level
)
)
@property
def id(self) -> str:
if not self.source or not self.source.accession_number:
return (
f'M_{self.pixel_spacing.as_magnification_string}:'
f'{self.width:06d}x{self.height:06d}{self.x:+07d}{self.y:+07d}'
)
# Be consistent with internal id format.
# "%s:%06dx%06d%+07d%+07d", image_id, width, height, left, top
return (
f'{self.source.accession_number}:M_{self.pixel_spacing.as_magnification_string}:'
f'{self.width:06d}x{self.height:06d}{self.x:+07d}{self.y:+07d}'
)
def is_patch_fully_in_source_image(self) -> bool:
if self._resized_patch_source is None:
return super().is_patch_fully_in_source_image()
return self._resized_patch_source.is_patch_fully_in_source_image()
@property
def pixel_spacing(self) -> pixel_spacing_module.PixelSpacing:
return self._pixel_spacing
def frame_numbers(self) -> Iterator[int]:
if self._resized_patch_source is None:
return super().frame_numbers()
# returns frame numbers in source image level.
return self._resized_patch_source.frame_numbers()
@property
def is_resized(self) -> bool:
return self._resized_patch_source is not None
@property
def level(
self,
) -> Union[slide_level_map.Level, slide_level_map.ResizedLevel]:
"""Level patch bytes are generated with respect to."""
return self._destination_image_level
def image_bytes(
self, color_transform: Optional[ImageCms.ImageCmsTransform] = None
) -> np.ndarray:
if self._resized_patch_source is None:
# no change to byte dimensions.
return super().image_bytes(color_transform=color_transform)
source_image_bytes = self._resized_patch_source.image_bytes(
color_transform=color_transform
)
if (
self.width > self._resized_patch_source.width
or self.height > self._resized_patch_source.height
):
# upsample bytes
pixels = cv2.resize(
source_image_bytes,
(self._upsample_target_width, self._upsample_target_height),
interpolation=cv2.INTER_CUBIC,
)
# clip regions of upsampled imaging not falling in patch.
return pixels[
self._bottom_y_offset_pad : (self.height + self._bottom_y_offset_pad),
self._bottom_x_offset_pad : (self.width + self._bottom_x_offset_pad),
...,
]
else:
# Downsample bytes
return cv2.resize(
source_image_bytes,
(self.width, self.height),
interpolation=cv2.INTER_AREA,
)
def get_patch(
self,
x: int,
y: int,
width: int,
height: int,
require_fully_in_source_image: Optional[bool] = None,
) -> DicomPatch:
"""Returns a patch based from the existing patch."""
require_fully_in_source_image = (
self._require_fully_in_source_image
if require_fully_in_source_image is None
else require_fully_in_source_image
)
return DicomPatch(
self._level,
x,
y,
width,
height,
self.source,
self._destination_image_level,
require_fully_in_source_image=require_fully_in_source_image,
)
def _get_native_level(
slm: slide_level_map.SlideLevelMap,
) -> Optional[slide_level_map.Level]:
"""Gets the native level from a SlideLevelMap.
The native level of a slide has the lowest level index.
Args:
slm: The source SlideLevelMap to get the native level from.
Returns:
The level that has the lowest index in the slide.
Raises:
LevelNotFoundError if the native level does not exist.
"""
if slm.level_index_min is not None:
level = slm.get_level(slm.level_index_min)
if level is not None:
return level
return None
def _copy_ndarray(
src_array: np.ndarray,
src_x: int,
src_y: int,
width: int,
height: int,
dst_array: np.ndarray,
dst_x: int,
dst_y: int,
) -> None:
"""Copies a sub-array of an ndarray to another ndarray.
Args:
src_array: The source ndarray to copy the sub-array from.
src_x: The X coordinate of the starting point to copy from, in the source
array.
src_y: The Y coordinate of the starting point to copy from, in the source
array.
width: The width of the sub-array to copy.
height: The height of the sub-array to copy.
dst_array: The destination ndarray to copy the sub-array into.
dst_x: The X coordinate of the starting point to copy to, in the destination
array.
dst_y: The Y coordinate of the starting point to copy, in the destination
array.
Raises:
SectionOutOfImageBoundsError if the sub-array to copy is out of scope of the
source array or the destination array.
"""
src_height, src_width = src_array.shape[:2]
dst_height, dst_width = dst_array.shape[:2]
if (
src_x < 0
or src_y < 0
or src_x + width > src_width
or src_y + height > src_height
):
raise ez_wsi_errors.SectionOutOfImageBoundsError(
'The sub-array to copy is out of the scope of the source array.'
)
if (
dst_x < 0
or dst_y < 0
or dst_x + width > dst_width
or dst_y + height > dst_height
):
raise ez_wsi_errors.SectionOutOfImageBoundsError(
'The sub-array to copy is out of the scope of the destination array.'
)
dst_array[dst_y : (dst_y + height), dst_x : (dst_x + width)] = src_array[
src_y : (src_y + height), src_x : (src_x + width)
]
def create_icc_profile_transformation(
source_image_icc_profile_bytes: bytes,
dest_icc_profile: Union[bytes, ImageCms.core.CmsProfile],
rendering_intent: ImageCms.Intent = ImageCms.Intent.PERCEPTUAL,
) -> Optional[ImageCms.ImageCmsTransform]:
"""Returns transformation to from pyramid colorspace to icc_profile.
Args:
source_image_icc_profile_bytes: Source image icc profile bytes.
dest_icc_profile: ICC Profile to DICOM Pyramid imaging to.
rendering_intent: Rendering intent to use in transformation.
Returns:
PIL.ImageCmsTransformation to transform pixel imaging or None.
"""
if not source_image_icc_profile_bytes or not dest_icc_profile:
return None
dicom_input_profile = ImageCms.getOpenProfile(
io.BytesIO(source_image_icc_profile_bytes)
)
if isinstance(dest_icc_profile, bytes):
dest_icc_profile = ImageCms.getOpenProfile(io.BytesIO(dest_icc_profile))
return ImageCms.buildTransform(
dicom_input_profile,
dest_icc_profile,
_RGB,
_RGB,
renderingIntent=rendering_intent,
)
def _get_level_series_path(
level: Union[slide_level_map.Level, slide_level_map.ResizedLevel],
) -> dicom_path.Path:
"""Returns level series path."""
if isinstance(level, slide_level_map.ResizedLevel):
level = level.source_level
return next(iter(level.instances.values())).path.GetSeriesPath()
class _DicomSeries(metaclass=abc.ABCMeta):
"""DICOM images."""
def __init__(
self,
dwi: dicom_web_interface.DicomWebInterface,
path: dicom_path.Path,
enable_client_slide_frame_decompression: bool = True,
accession_number: Optional[str] = None,
logging_factory: Optional[
ez_wsi_logging_factory.AbstractLoggingInterfaceFactory
] = None,
slide_frame_cache: Optional[
local_dicom_slide_cache.InMemoryDicomSlideCache
] = None,
):
"""Constructor.
Args:
dwi: The DicomWebInterface that has been configured to be able to access
the series referenced by the input path.
path: The path to a DICOM series object in a DICOMStore.
enable_client_slide_frame_decompression: Set to True to enable client side
frame decompression optimization (Recommended value = True); remove
parameter following GG validation.
accession_number: The accession_number of the slide.
logging_factory: The factory that EZ WSI uses to construct a logging
interface.
slide_frame_cache: Initialize to use shared slide cache.
"""
self._logger = None
if logging_factory is None:
self._logging_factory = ez_wsi_logging_factory.BasePythonLoggerFactory(
ez_wsi_logging_factory.DEFAULT_EZ_WSI_PYTHON_LOGGER_NAME
)
else:
self._logging_factory = logging_factory
self._dwi = dwi
self.path = path
self.accession_number = accession_number
self._slide_frame_cache = slide_frame_cache
self._enable_client_slide_frame_decompression = (
enable_client_slide_frame_decompression
)
@abc.abstractmethod
def json_metadata_dict(
self,
level_subset: Optional[List[slide_level_map.Level]] = None,
max_json_encoded_icc_profile_size: int = slide_level_map.DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES,
) -> Mapping[str, Any]:
"""Returns JSON dict metadata for the series."""
def json_metadata(
self,
level_subset: Optional[List[slide_level_map.Level]] = None,
max_json_encoded_icc_profile_size: int = slide_level_map.DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES,
) -> str:
"""Returns JSON encoded metadata for the series."""
return json.dumps(
self.json_metadata_dict(
level_subset=level_subset,
max_json_encoded_icc_profile_size=max_json_encoded_icc_profile_size,
)
)
def get_credentials(self) -> google.auth.credentials.Credentials:
return self._dwi.credentials()
def get_credential_header(
self,
credential: Optional[google.auth.credentials.Credentials] = None,
) -> Dict[str, str]:
"""Returns credential header for retrieval of DICOM store."""
headers = {}
if credential is None:
credential = self.get_credentials()
else:
credential_factory.refresh_credentials(credential)
credential.apply(headers)
return headers
def get_patch_bounds_dicom_instance_frame_numbers(
self,
image_level: Union[
slide_level_map.Level,
slide_level_map.ResizedLevel,
],
patch_bounds_list: List[PatchBounds],
) -> Mapping[str, List[int]]:
"""Returns Map[DICOM instances: frame numbers] that fall in patch bounds.
Args:
image_level: Level that patch represents.
patch_bounds_list: List of PatchBounds to return frame indexes for.
Returns:
Mapping between DICOM instances, path, and list of frames numbers required
to render patches.
"""
if isinstance(image_level, slide_level_map.ResizedLevel):
source_image_level = image_level.source_level
else:
source_image_level = image_level
slide_instance_frame_map = {}
indexes_required_for_inference = []
for patch_bounds in patch_bounds_list:
patch = DicomPatch(
source_image_level,
patch_bounds.x_origin,
patch_bounds.y_origin,
patch_bounds.width,
patch_bounds.height,
self,
image_level,
)
# Frame indexes returns a list of indexes in the patch. Indexes
# are returned in sorted order.
indexes_required_for_inference.append(patch.frame_numbers())
instance_frame_number_buffer = []
instance = None
# Use Heapq to merge pre-sorted lists into single sorted list
# Result of heapq.merge can have duplicates
for frame_number in heapq.merge(*indexes_required_for_inference):
if (
instance is None
or instance.instance_frame_number_from_wholes_slide_frame_number( # pytype: disable=attribute-error
frame_number
)
> instance.frame_count # pytype: disable=attribute-error
):
if instance_frame_number_buffer:
slide_instance_frame_map[str(instance.dicom_object.path)] = ( # pytype: disable=attribute-error
instance_frame_number_buffer
)
instance = source_image_level.get_instance_by_frame(frame_number)
if instance is None:
instance_frame_number_buffer = []
continue
instance_frame_number_buffer = [
instance.instance_frame_number_from_wholes_slide_frame_number(
frame_number
)
]
continue
instance_frame_number = (
instance.instance_frame_number_from_wholes_slide_frame_number(
frame_number
)
)
if (
instance_frame_number_buffer
and instance_frame_number_buffer[-1] == instance_frame_number
):
# remove duplicates
continue
instance_frame_number_buffer.append(instance_frame_number)
if instance_frame_number_buffer:
slide_instance_frame_map[str(instance.dicom_object.path)] = (
instance_frame_number_buffer
)
return slide_instance_frame_map
def preload_level_in_frame_cache(
self,
level: Union[slide_level_map.Level, slide_level_map.ResizedLevel],
blocking: bool = True,
):
"""Preloads entire level into frame cache.
Args:
level: Level or resized level to load into frame cache.
blocking: If method should block until loading is complete.
Returns:
None
Raises:
ez_wsi_errors.LevelNotFoundError: Level or resized level not found on
DICOM Slide.
"""
slide_frame_cache = self.slide_frame_cache
if slide_frame_cache is None:
return
if isinstance(level, slide_level_map.ResizedLevel):
level = level.source_level
# get path of instance on the level.
if _get_level_series_path(level) != self.path.GetSeriesPath():
raise ez_wsi_errors.LevelNotFoundError(
'Level path does not match DICOM slide path.'
)
slide_frame_cache.cache_whole_instance_in_memory(level, blocking)
def preload_patches_in_frame_cache(
self,
patch_seq: Union[Sequence[DicomPatch], DicomPatch],
blocking: bool = True,
copy_from_cache: Optional[
local_dicom_slide_cache.InMemoryDicomSlideCache
] = None,
) -> None:
"""Pre-load sequence of DICOM Patches in frame cache.
Args:
patch_seq: patch or list of patches to load in frame cache.
blocking: If method should block until loading is complete.
copy_from_cache: Optional cache to copy frames from to avoid
re-downloading.
Returns:
None
Raises:
ez_wsi_errors.LevelNotFoundError: Patch not found on DICOM Slide.
"""
slide_frame_cache = self.slide_frame_cache
if slide_frame_cache is None or not patch_seq:
return
if isinstance(patch_seq, DicomPatch):
patch_seq = [patch_seq]
level = None
patches = []
index = 0
seq_len = len(patch_seq)
slide_series_path = self.path.GetSeriesPath()
while index < seq_len:
patch = patch_seq[index]
if level is None or level == patch.level:
patches.append(patch)
level = patch.level
index += 1
continue
# get path of instance on the level.
if _get_level_series_path(level) != slide_series_path:
raise ez_wsi_errors.LevelNotFoundError(
'Patch path does not match DICOM slide path.'
)
instance_frame_map = self.get_patch_bounds_dicom_instance_frame_numbers(
level, [p.patch_bounds for p in patches]
)
slide_frame_cache.preload_instance_frame_numbers(
instance_frame_map, copy_from_cache
)
level = None
patches = []
if patches:
if _get_level_series_path(level) != slide_series_path:
raise ez_wsi_errors.LevelNotFoundError(
'Patch path does not match DICOM slide path.'
)
instance_frame_map = self.get_patch_bounds_dicom_instance_frame_numbers(
level, [p.patch_bounds for p in patches]
)
slide_frame_cache.preload_instance_frame_numbers(
instance_frame_map, copy_from_cache
)
if blocking:
slide_frame_cache.block_until_frames_are_loaded()
@property
def logger(self) -> ez_wsi_logging_factory.AbstractLoggingInterface:
if self._logger is None:
self._logger = self._logging_factory.create_logger()
return self._logger
@property
def slide_frame_cache(
self,
) -> Optional[local_dicom_slide_cache.InMemoryDicomSlideCache]:
"""Returns DICOM slide frame cache used by slide."""
return self._slide_frame_cache
@slide_frame_cache.setter
def slide_frame_cache(
self,
slide_frame_cache: Optional[
local_dicom_slide_cache.InMemoryDicomSlideCache
],
) -> None:
"""Sets DICOM slide frame cache used by slide.
Shared cache's configured using max_cache_frame_memory_lru_cache_size_bytes
can be used to limit total cache memory utilization across multiple stores.
It is not recommended to share non-LRU frame caches.
Args:
slide_frame_cache: Reference to slide frame cache.
"""
self._slide_frame_cache = slide_frame_cache
def init_slide_frame_cache(
self,
max_cache_frame_memory_lru_cache_size_bytes: Optional[int] = None,
number_of_frames_to_read: int = local_dicom_slide_cache.DEFAULT_NUMBER_OF_FRAMES_TO_READ_ON_CACHE_MISS,
max_instance_number_of_frames_to_prefer_whole_instance_download: int = local_dicom_slide_cache.MAX_INSTANCE_NUMBER_OF_FRAMES_TO_PREFER_WHOLE_INSTANCE_DOWNLOAD,
optimization_hint: local_dicom_slide_cache_types.CacheConfigOptimizationHint = local_dicom_slide_cache_types.CacheConfigOptimizationHint.MINIMIZE_DICOM_STORE_QPM,
) -> local_dicom_slide_cache.InMemoryDicomSlideCache:
"""Initializes DICOM slide frame cache.
Args:
max_cache_frame_memory_lru_cache_size_bytes: Maximum size of cache in
bytes. Ideally should be in hundreds of megabyts-to-gigabyte size. If
None, no limit to size.
number_of_frames_to_read: Number of frames to read on cache miss.
max_instance_number_of_frames_to_prefer_whole_instance_download: Max
number of frames to prefer downloading whole instances over retrieving
frames in batch (Typically faster for small instances e.g. < 10,0000).
Optimal threshold will depend on average size of instance frame data and
the size of non frame instance metadata.
optimization_hint: Optimize cache to minimize data latency or total
queries to the DICOM store.
Returns:
Instance of frame cache.
"""
self._slide_frame_cache = local_dicom_slide_cache.InMemoryDicomSlideCache(
credential_factory=self._dwi.credential_factory,
max_cache_frame_memory_lru_cache_size_bytes=max_cache_frame_memory_lru_cache_size_bytes,
number_of_frames_to_read=number_of_frames_to_read,
max_instance_number_of_frames_to_prefer_whole_instance_download=max_instance_number_of_frames_to_prefer_whole_instance_download,
optimization_hint=optimization_hint,
logging_factory=self._logging_factory,
)
return self._slide_frame_cache
def remove_slide_frame_cache(self) -> None:
"""Removes slide frame cache."""
self._slide_frame_cache = None
@property
def dwi(self) -> dicom_web_interface.DicomWebInterface:
return self._dwi
def _get_cached_frame_bytes(
self,
instance: slide_level_map.Instance,
frame_number: int,
) -> Optional[bytes]:
"""Returns frame bytes from frame cache if possible.
Args:
instance: Instance to return frame from.
frame_number: Instance frame number to return.
Returns:
Frame bytes or None.
"""
if (
self._slide_frame_cache is None
or not is_client_side_pixel_decoding_supported(
instance.dicom_object.get_value(tags.TRANSFER_SYNTAX_UID)
)
):
return None
return self._slide_frame_cache.get_frame(
instance.dicom_object.path,
instance.frame_count,
frame_number,
)
def _get_frame_bytes_from_server(
self,
instance: slide_level_map.Instance,
instance_frame_index: int,
transcoding: dicom_web_interface.TranscodeDicomFrame,
) -> bytes:
"""Returns frame bytes from server.
Args:
instance: DICOM instance.
instance_frame_index: Frame index to retrieve.
transcoding: How to transcode DICOM frames.
Returns:
Frame bytes.
"""
instance_path = instance.dicom_object.path
cache_key = (
f'i:{str(instance_path)} f:{instance_frame_index} t:{transcoding.value}'
)
if self._slide_frame_cache is not None:
frame_raw_bytes = (
self._slide_frame_cache.get_cached_externally_acquired_bytes(
cache_key
)
)
if frame_raw_bytes is not None:
return frame_raw_bytes
frame_raw_bytes = self.dwi.get_frame_image(
instance_path, instance_frame_index, transcoding
)
if self._slide_frame_cache is not None:
self._slide_frame_cache.cache_externally_acquired_bytes(
cache_key, frame_raw_bytes
)
return frame_raw_bytes
def _get_frame_client_transcoding(
self,
instance: slide_level_map.Instance,
frame_number: int,
) -> Optional[np.ndarray]:
"""Returns DICOM Frame using DICOM server, transcodes to raw on server.
Args:
instance: DICOM instance within level to return frame from.
frame_number: Frame number within the instance to return.
Returns:
Tuple[numpy array containing frame data, bool indicating if data should
be cached True in LRU or is already cached within the level]
"""
compressed_bytes = self._get_cached_frame_bytes(
instance,
frame_number,
)
if compressed_bytes is None:
compressed_bytes = self._get_frame_bytes_from_server(
instance,
frame_number,
dicom_web_interface.TranscodeDicomFrame.DO_NOT_TRANSCODE,
)
return dicom_frame_decoder.decode_dicom_compressed_frame_bytes(
compressed_bytes,
instance.dicom_object.get_value(tags.TRANSFER_SYNTAX_UID),
)
def _get_frame_server_transcoding(
self,
level: slide_level_map.Level,
instance: slide_level_map.Instance,
frame_number: int,
) -> np.ndarray:
"""Returns DICOM Frame using DICOM server, transcodes to raw on server.
Args:
level: WSI pyramid level.
instance: DICOM instance within level to return frame from.
frame_number: Frame number within the instance to return.
Returns:
Tuple[numpy array containing frame data, bool indicating if data should
be cached True in LRU or is already cached within the level]
"""
# Only use cache if frame bytes are stored a uncompressed little endian.
if (
level.transfer_syntax_uid
not in _SUPPORTED_CLIENT_SIDE_DECODING_RAW_TRANSFER_SYNTAXS
):
frame_raw_bytes = None
else:
frame_raw_bytes = self._get_cached_frame_bytes(
instance,
frame_number,
)
if frame_raw_bytes is None:
frame_raw_bytes = self._get_frame_bytes_from_server(
instance,
frame_number,
dicom_web_interface.TranscodeDicomFrame.UNCOMPRESSED_LITTLE_ENDIAN,
)
# crop bytes if DICOM buffer is padded.
max_size = int(
level.frame_height
* level.frame_width
* level.samples_per_pixel
* np.dtype(level.pixel_format).itemsize
)
if len(frame_raw_bytes) > max_size:
frame_raw_bytes = frame_raw_bytes[:max_size]
return np.frombuffer(frame_raw_bytes, level.pixel_format).reshape(
(level.frame_height, level.frame_width, level.samples_per_pixel)
)
def get_frame(
self,
level: slide_level_map.Level,
frame_number: int,
) -> Optional[Frame]:
"""Gets a frame at a specific pixel_spacing in mm.
The DICOMWeb API serves image pixels by the unit of frames. Frames have
fixed size (width and height). Call get_patch() instead if you want to get
an image patch at a specific loation, or with a specific dimension.
The function utilizes a LRUCache to cache the most recent used frames.
Args:
level: source pyramid level for frame imaging if not defined pyramid level
is deriveved using pixel_spacing.
frame_number: The frame number to be fetched. The frames are stored in
arrays with 1-based indexing.
Returns:
Returns the requested frame if exists, None otherwise.
Raises:
InputFrameNumberOutOfRangeError if the input frame_number is
out of range.
"""
if (
frame_number < level.frame_number_min
or frame_number > level.frame_number_max
):
raise ez_wsi_errors.InputFrameNumberOutOfRangeError(
f'frame_number value [{frame_number}] is out of range: '
f'[{level.frame_number_min}, {level.frame_number_max}]'
)
instance = level.get_instance_by_frame(frame_number)
if instance is None:
# A frame may not exist in DICOMStore if it contains no tissue pixels.
return None
instance_frame_number = (
instance.instance_frame_number_from_wholes_slide_frame_number(
frame_number
)
)
pos_x, pos_y = level.get_frame_position(frame_number)
frame_ndarray = None
if (
self._enable_client_slide_frame_decompression
and dicom_frame_decoder.can_decompress_dicom_transfer_syntax(
level.transfer_syntax_uid
)
):
frame_ndarray = self._get_frame_client_transcoding(
instance, instance_frame_number
)
# if frame_ndarray == None unable to decode bytes, likely cause is actual
# pixel encoding does not match DICOM transfer syntax. Fail over to server
# side transcoding.
if frame_ndarray is None:
frame_ndarray = self._get_frame_server_transcoding(
level, instance, instance_frame_number
)
frame = Frame(
x_origin=pos_x,
y_origin=pos_y,
width=level.frame_width,
height=level.frame_height,
image_np=frame_ndarray,
)
return frame
def get_image(
self,
pixel_spacing: Union[
slide_level_map.Level,
slide_level_map.ResizedLevel,
],
) -> DicomImage:
"""Gets an image from a specific pixel spacing."""
return DicomImage(pixel_spacing, self)
def get_patch(
self,
level: Union[
slide_level_map.Level,
slide_level_map.ResizedLevel,
],
x: int,
y: int,
width: int,
height: int,
require_fully_in_source_image: bool = False,
) -> DicomPatch:
"""Gets a patch from a specific slide level.
The area of a patch is defined by its position(x, y) and its dimension
(width, height). The position of a patch corresponds to the first pixel in
the patch and has the smallest coordinates. All coordinates(x, y, width and
height) are defined in the pixel space of the requested pixel spacing.
This routine creates the requested patch on-the-fly, by sampling image
pixels from all frames that are overlapped with the patch.
Args:
level: Level to generate patch from.
x: The X coordinate of the patch position in level.
y: The Y coordinate of the patch position in level.
width: The width of the patch in level.
height: The height of the patch in level.
require_fully_in_source_image: Require patch dimensions fully exist in
level.
Returns:
Returns the requested patch.
"""
if isinstance(level, slide_level_map.ResizedLevel):
return DicomPatch(
level.source_level,
x,
y,
width,
height,
self,
level,
require_fully_in_source_image=require_fully_in_source_image,
)
return DicomPatch(
level,
x,
y,
width,
height,
self,
level,
require_fully_in_source_image=require_fully_in_source_image,
)
@abc.abstractmethod
def get_level_by_index(
self, index: slide_level_map.LevelIndexType
) -> Optional[slide_level_map.Level]:
"""Returns the level by requested level index."""
def has_level(self, level: slide_level_map.Level) -> bool:
return level is self.get_level_by_index(level.level_index)
@abc.abstractmethod
def are_instances_concatenated(self, instance_uids: list[str]) -> bool:
"""Returns True if the instances provided are concatenated."""
@property
@abc.abstractmethod
def all_levels(self) -> Iterator[slide_level_map.Level]:
"""return all levels of series."""
def get_instance_level(
self, sop_instance_uid: str
) -> Optional[slide_level_map.Level]:
for level in self.all_levels:
for instance in level.instances.values():
if (
instance.dicom_object.get_value(tags.SOP_INSTANCE_UID)
== sop_instance_uid
):
return level
return None
def _filter_dicom_object(
self,
instance: dicom_web_interface.DicomObject,
slide_instances_uid_set: set[str],
filter_by_annotation_iod: Optional[str],
filter_by_operator_id: Optional[str],
) -> bool:
"""Filters instances by annotation IOD, referenced series UID, and operator ID."""
ds = pydicom.Dataset.from_json(instance.dicom_tags)
# Filter by provided IOD.
if filter_by_annotation_iod and ds.SOPClassUID != filter_by_annotation_iod:
return False
# Check if instance is referencing the slide's series or one if its
# instances.
reference_found = False
if ds.SeriesInstanceUID == self.path.series_uid:
reference_found = True
try:
# https://dicom.innolitics.com/ciods/vl-whole-slide-microscopy-image/common-instance-reference/00081115/0020000e
if (
not reference_found
and ds.ReferencedSeriesSequence[0].SeriesInstanceUID
== self.path.series_uid
):
reference_found = True
except AttributeError:
pass
try:
# https://dicom.innolitics.com/ciods/microscopy-bulk-simple-annotations/microscopy-bulk-simple-annotations/00081140/00081155
if (
not reference_found
and ds.ReferencedImageSequence[0].ReferencedSOPInstanceUID
in slide_instances_uid_set
):
reference_found = True
except AttributeError:
pass
if not reference_found:
return False
# Filter by operator ID.
try:
if not filter_by_operator_id:
return True
for code in ds.OperatorIdentificationSequence:
for code in code.PersonIdentificationCodeSequence:
if code.LongCodeValue == filter_by_operator_id:
return True
except AttributeError:
pass
return False
def _find_annotation_instances(
self,
levels: Iterator[slide_level_map.Level],
annotation_dicom_store: Optional[dicom_path.Path] = None,
filter_by_annotation_iod: Optional[
str
] = MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_STORAGE,
filter_by_operator_id: Optional[str] = None,
) -> Iterator[dicom_path.Path]:
"""Returns iterator that contains all of a slide's annotation instances.
The annotation instances much either reference the slide's series UID or
have the same series UID as the slide.
Args:
levels: Images to search for annotation instances.
annotation_dicom_store: The DICOM store to search for annotation
instances. We assume here that the study UID stays the same between the
slide and annotation DICOM stores.
filter_by_annotation_iod: The annotation IOD to filter by. Default is
Microscopy-Bulk-Simple-Annotations.
https://dicom.nema.org/medical/dicom/current/output/chtml/part04/sect_b.5.html
filter_by_operator_id: The operator ID to filter by. This searches the
Operator Identification Sequence long code value for the provided ID.
https://dicom.innolitics.com/ciods/vl-whole-slide-microscopy-image/general-series/00081072/00401101/00080119
Returns:
An iterator that contains all of a slide's annotation instances.
"""
if annotation_dicom_store is None:
annotation_dicom_store = self.path.GetStorePath()
if annotation_dicom_store is not None:
# Update study path to match annotation store.
study_path = dicom_path.FromPath(
annotation_dicom_store,
study_uid=self.path.study_uid,
series_uid='',
instance_uid='',
)
else:
# Assume annotations are stored in the same dicom store as the images.
study_path = self.path.GetStudyPath()
# Get all instances in the study in the provided DICOM store.
# Make sure instance is an annotation modality.
# https://www.dicomlibrary.com/dicom/modality
dicom_instances = self.dwi.get_instances(study_path, modality='ANN')
# Get all instances of this slide.
slide_instances_set_uid = set[str]()
for level in levels:
for instance in level.instances.values():
slide_instances_set_uid.add(
instance.dicom_object.get_value(tags.SOP_INSTANCE_UID)
)
# Keep only relevant instances and transform to path iterator.
return iter(
dicom_path.FromPath(
study_path,
study_uid=instance.get_value(tags.STUDY_INSTANCE_UID),
series_uid=instance.get_value(tags.SERIES_INSTANCE_UID),
instance_uid=instance.get_value(tags.SOP_INSTANCE_UID),
)
for instance in dicom_instances
if self._filter_dicom_object(
instance,
slide_instances_set_uid,
filter_by_annotation_iod,
filter_by_operator_id,
)
)
class DicomSlide(_DicomSeries):
"""Represents a DICOM pathology slide stored in a DICOMStore."""
def __init__(
self,
dwi: dicom_web_interface.DicomWebInterface,
path: dicom_path.Path,
enable_client_slide_frame_decompression: bool = True,
accession_number: Optional[str] = None,
pixel_spacing_diff_tolerance: float = pixel_spacing_module.PIXEL_SPACING_DIFF_TOLERANCE,
logging_factory: Optional[
ez_wsi_logging_factory.AbstractLoggingInterfaceFactory
] = None,
slide_frame_cache: Optional[
local_dicom_slide_cache.InMemoryDicomSlideCache
] = None,
json_metadata: Union[str, Mapping[str, Any]] = '',
instances: Optional[Collection[dicom_web_interface.DicomObject]] = None,
):
"""Constructor.
Args:
dwi: The DicomWebInterface that has been configured to be able to access
the series referenced by the input path.
path: The path to a DICOM series object in a DICOMStore.
enable_client_slide_frame_decompression: Set to True to enable client side
frame decompression optimization (Recommended value = True); remove
parameter following GG validation.
accession_number: The accession_number of the slide.
pixel_spacing_diff_tolerance: The tolerance (percentage difference) for
difference between row and column pixel spacings.
logging_factory: The factory that EZ WSI uses to construct a logging
interface.
slide_frame_cache: Initialize to use shared slide cache.
json_metadata: Optional json formatted slide level metadata.
instances: Optional list of instances to use to initialize the slide.
"""
path = path.GetSeriesPath()
super().__init__(
dwi,
path,
enable_client_slide_frame_decompression,
accession_number,
logging_factory,
slide_frame_cache,
)
if json_metadata:
# if metadata is defined convert string to json.
if isinstance(json_metadata, str):
try:
json_metadata = json.loads(json_metadata)
except json.JSONDecodeError as exp:
raise ez_wsi_errors.InvalidSlideJsonMetadataError(
'Error decoding JSON metadata.'
) from exp
if json_metadata: # Ignore empty JSON metadata.
try:
if str(path) != str(
dicom_path.Path.from_dict(json_metadata[_SLIDE_PATH])
):
raise ez_wsi_errors.SlidePathDoesNotMatchJsonMetadataError(
'Slide path does not match slide path in json metadata.'
)
self._level_map = slide_level_map.SlideLevelMap.create_from_json(
json_metadata[LEVEL_MAP],
pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
)
except (TypeError, IndexError, KeyError) as exp:
raise ez_wsi_errors.InvalidSlideJsonMetadataError(
'Incorrectly formatted JSON metadata.'
) from exp
else:
self._level_map = slide_level_map.SlideLevelMap(
dwi.get_instances(path) if instances is None else instances,
pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
)
self._native_level = _get_native_level(self._level_map)
def __copy__(self) -> DicomSlide:
instance = DicomSlide.__new__(DicomSlide)
vars(instance).update(vars(self))
instance._level_map = copy.copy(self._level_map)
return instance
@property
def native_level(self) -> slide_level_map.Level:
if self._native_level is None:
raise ez_wsi_errors.LevelNotFoundError(
'Slide does not define pyramid imaging.'
)
return self._native_level
@property
def total_pixel_matrix_columns(self) -> int:
return self.native_level.width
@property
def total_pixel_matrix_rows(self) -> int:
return self.native_level.height
@property
def pixel_format(self) -> np.dtype:
return self.native_level.pixel_format
@property
def native_pixel_spacing(self) -> pixel_spacing_module.PixelSpacing:
return self.native_level.pixel_spacing
def set_icc_profile_bytes(self, icc_profile_bytes: bytes) -> None:
"""Sets ICC Profile bytes for pyramid."""
self._level_map.set_icc_profile_bytes(icc_profile_bytes)
def get_json_encoded_icc_profile_size(self) -> int:
return self._level_map.get_json_encoded_icc_profile_size()
def get_icc_profile_bytes(self) -> bytes:
"""Returns ICC Profile bytes for pyramid."""
return self._level_map.get_icc_profile_bytes(self._dwi)
def are_instances_concatenated(self, instance_uids: list[str]) -> bool:
"""Returns True if the instances provided are concatenated.
This also indicates if the instances are of the same pixel spacing.
Args:
instance_uids: A list of SOP Instance UIDs to check
Returns:
True if the instances are concatenated or if only one instance uid is
provided. Otherwise returns False.
"""
return self._level_map.are_instances_concatenated(instance_uids)
def get_patch_bounds_dicom_instance_frame_numbers(
self,
image_level: Union[
slide_level_map.Level,
slide_level_map.ResizedLevel,
pixel_spacing_module.PixelSpacing,
],
patch_bounds_list: List[PatchBounds],
) -> Mapping[str, List[int]]:
"""Returns Map[DICOM instances: frame numbers] that fall in patch bounds.
Args:
image_level: Level that patch represents.
patch_bounds_list: List of PatchBounds to return frame indexes for.
Returns:
Mapping between DICOM instances, path, and list of frames numbers required
to render patches.
"""
if isinstance(image_level, pixel_spacing_module.PixelSpacing):
return super().get_patch_bounds_dicom_instance_frame_numbers(
_pixel_spacing_to_level(self, image_level), patch_bounds_list
)
else:
return super().get_patch_bounds_dicom_instance_frame_numbers(
image_level, patch_bounds_list
)
def create_icc_profile_transformation(
self,
icc_profile: Union[bytes, ImageCms.core.CmsProfile],
rendering_intent: ImageCms.Intent = ImageCms.Intent.PERCEPTUAL,
) -> Optional[ImageCms.ImageCmsTransform]:
"""Returns transformation to from pyramid colorspace to icc_profile.
Args:
icc_profile: ICC Profile to DICOM Pyramid imaging to.
rendering_intent: Rendering intent to use in transformation.
Returns:
PIL.ImageCmsTransformation to transform pixel imaging or None.
"""
return create_icc_profile_transformation(
self.get_icc_profile_bytes(), icc_profile, rendering_intent
)
def get_image(
self,
pixel_spacing: Union[
slide_level_map.Level,
slide_level_map.ResizedLevel,
pixel_spacing_module.PixelSpacing,
],
) -> DicomImage:
"""Gets an image from a specific pixel spacing."""
if isinstance(pixel_spacing, pixel_spacing_module.PixelSpacing):
return DicomImage(_pixel_spacing_to_level(self, pixel_spacing), self)
return DicomImage(pixel_spacing, self)
def get_patch(
self,
level: Union[
slide_level_map.Level,
slide_level_map.ResizedLevel,
pixel_spacing_module.PixelSpacing,
],
x: int,
y: int,
width: int,
height: int,
require_fully_in_source_image: bool = False,
) -> DicomPatch:
if isinstance(level, pixel_spacing_module.PixelSpacing):
level = _pixel_spacing_to_level(self, level)
return super().get_patch(
level,
x,
y,
width,
height,
require_fully_in_source_image=require_fully_in_source_image,
)
def get_frame(
self,
level: Union[slide_level_map.Level, pixel_spacing_module.PixelSpacing],
frame_number: int,
) -> Optional[Frame]:
"""Gets a frame at a specific pixel_spacing in mm.
The DICOMWeb API serves image pixels by the unit of frames. Frames have
fixed size (width and height). Call get_patch() instead if you want to get
an image patch at a specific loation, or with a specific dimension.
The function utilizes a LRUCache to cache the most recent used frames.
Args:
level: source pyramid level for frame imaging if not defined pyramid level
is deriveved using pixel_spacing.
frame_number: The frame number to be fetched. The frames are stored in
arrays with 1-based indexing.
Returns:
Returns the requested frame if exists, None otherwise.
Raises:
InputFrameNumberOutOfRangeError if the input frame_number is
out of range.
"""
if isinstance(level, pixel_spacing_module.PixelSpacing):
return super().get_frame(
_pixel_spacing_to_level(self, level), frame_number
)
return super().get_frame(level, frame_number)
def json_metadata_dict(
self,
level_subset: Optional[List[slide_level_map.Level]] = None,
max_json_encoded_icc_profile_size: int = slide_level_map.DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES,
) -> Mapping[str, Any]:
return {
_SOP_CLASS_UID: (
slide_level_map.VL_WHOLE_SLIDE_MICROSCOPY_IMAGE_SOP_CLASS_UID
),
_SLIDE_PATH: self.path.to_dict(),
LEVEL_MAP: self._level_map.to_dict(
level_subset=level_subset,
max_json_encoded_icc_profile_size=max_json_encoded_icc_profile_size,
),
}
def __eq__(self, other: Any) -> bool:
if isinstance(other, DicomSlide):
return str(self.path) == str(other.path)
return False
@property
def all_pixel_spacing_mms(self) -> list[float]:
"""Lists all Pixel Spacings in mm in the DicomSlide."""
return [
level.pixel_spacing.pixel_spacing_mm
for level in self._level_map.level_map.values()
if level.pixel_spacing.is_defined
]
def get_level_by_pixel_spacing(
self,
pixel_spacing: pixel_spacing_module.PixelSpacing,
relative_pixel_spacing_equality_threshold: float = slide_level_map.MAX_LEVEL_DIST,
maximum_downsample: float = 0.0,
) -> Optional[slide_level_map.Level]:
"""Gets the level corresponding to the input pixel spacing.
Args:
pixel_spacing: The pixel spacing to use for level lookup.
relative_pixel_spacing_equality_threshold: Maximum relative difference in
(pyramid / level pixel spacing / desired pixel spacing) at which pixel
spacing should be considered equilvalent. E.g., (value of 0.25 =
pyramid levels with pixels spacings that are 25% smaller - 25% larger
are considered equlivalent). Pathology imaging is typically represented
as a pyramid with pyramid levels being 200% or greater relative
magnifications of each other.
maximum_downsample: Maximum degree to which it is acceptable to downsample
pixel imaging to acchieve a desired target pixel spacing. Only used when
source imaging pixel spacing is < and != to desired target pixel
spacing.
Returns:
The level corresponding to the input pixel spacing. None if the requested
pixel spacing does not exist.
"""
# Converts pixel spacing returned by Magnification.NominalPixelSize() from
# micrometers to millimeters.
return self._level_map.get_level_by_pixel_spacing(
pixel_spacing.pixel_spacing_mm,
relative_pixel_spacing_equality_threshold=relative_pixel_spacing_equality_threshold,
maximum_downsample=maximum_downsample,
)
def get_closest_level_with_pixel_spacing_equal_or_less_than_target(
self,
pixel_spacing: pixel_spacing_module.PixelSpacing,
) -> Optional[slide_level_map.Level]:
"""Gets the level corresponding to the input pixel spacing.
Args:
pixel_spacing: The pixel spacing to use for level lookup.
Returns:
The level corresponding to the input pixel spacing. None if the requested
pixel spacing does not exist.
"""
# Converts pixel spacing returned by Magnification.NominalPixelSize() from
# micrometers to millimeters.
return self._level_map.get_levels_with_closest_pixel_spacing(
pixel_spacing.pixel_spacing_mm
).equal_or_smaller_pixel_spacing
def get_closest_level_with_pixel_spacing_greater_than_target(
self,
pixel_spacing: pixel_spacing_module.PixelSpacing,
) -> Optional[slide_level_map.Level]:
"""Gets the level corresponding to the input pixel spacing.
Args:
pixel_spacing: The pixel spacing to use for level lookup.
Returns:
The level corresponding to the input pixel spacing. None if the requested
pixel spacing does not exist.
"""
# Converts pixel spacing returned by Magnification.NominalPixelSize() from
# micrometers to millimeters.
return self._level_map.get_levels_with_closest_pixel_spacing(
pixel_spacing.pixel_spacing_mm
).greater_pixel_spacing
def get_instance_pixel_spacing(
self, instance_uid: str
) -> Optional[pixel_spacing_module.PixelSpacing]:
"""Given an Instance UID retrieves its corresponding PixelSpacing.
Args:
instance_uid: A SOP Instance UID to find
Returns:
PixelSpacing of the instance. Raises if no match
Raises:
PixelSpacingNotFoundForInstanceError if the instance is not in the level
map.
"""
return self._level_map.get_instance_pixel_spacing(instance_uid)
@property
def thumbnail(self) -> Optional[slide_level_map.Level]:
return self._level_map.thumbnail
@property
def label(self) -> Optional[slide_level_map.Level]:
return self._level_map.label
@property
def overview(self) -> Optional[slide_level_map.Level]:
return self._level_map.overview
@property
def levels(self) -> Iterator[slide_level_map.Level]:
"""Returns iterator that contains all of a slide's DICOM Levels."""
return iter(self._level_map.level_map.values())
def get_level_by_index(
self, index: slide_level_map.LevelIndexType
) -> Optional[slide_level_map.Level]:
return self._level_map.get_level(index)
@property
def wsi_label_overview_thumbnail_levels(
self,
) -> Iterator[slide_level_map.Level]:
return iter([
lvl
for lvl in [self.thumbnail, self.label, self.overview]
if lvl is not None
])
@property
def all_levels(self) -> Iterator[slide_level_map.Level]:
return itertools.chain(
self.levels,
self.wsi_label_overview_thumbnail_levels,
)
def _filter_dicom_object(
self,
instance: dicom_web_interface.DicomObject,
slide_instances_uid_set: set[str],
filter_by_annotation_iod: Optional[str],
filter_by_operator_id: Optional[str],
) -> bool:
"""Filters instances by annotation IOD, referenced series UID, and operator ID."""
ds = pydicom.Dataset.from_json(instance.dicom_tags)
# Filter by provided IOD.
if filter_by_annotation_iod and ds.SOPClassUID != filter_by_annotation_iod:
return False
# Check if instance is referencing the slide's series or one if its
# instances.
reference_found = False
if ds.SeriesInstanceUID == self.path.series_uid:
reference_found = True
try:
# https://dicom.innolitics.com/ciods/vl-whole-slide-microscopy-image/common-instance-reference/00081115/0020000e
if (
not reference_found
and ds.ReferencedSeriesSequence[0].SeriesInstanceUID
== self.path.series_uid
):
reference_found = True
except AttributeError:
pass
try:
# https://dicom.innolitics.com/ciods/microscopy-bulk-simple-annotations/microscopy-bulk-simple-annotations/00081140/00081155
if (
not reference_found
and ds.ReferencedImageSequence[0].ReferencedSOPInstanceUID
in slide_instances_uid_set
):
reference_found = True
except AttributeError:
pass
if not reference_found:
return False
# Filter by operator ID.
try:
if not filter_by_operator_id:
return True
for code in ds.OperatorIdentificationSequence:
for code in code.PersonIdentificationCodeSequence:
if code.LongCodeValue == filter_by_operator_id:
return True
except AttributeError:
pass
return False
def find_annotation_instances(
self,
annotation_dicom_store: Optional[dicom_path.Path] = None,
filter_by_annotation_iod: Optional[
str
] = MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_STORAGE,
filter_by_operator_id: Optional[str] = None,
) -> Iterator[dicom_path.Path]:
"""Returns iterator that contains all of a slide's annotation instances.
The annotation instances much either reference the slide's series UID or
have the same series UID as the slide.
Args:
annotation_dicom_store: The DICOM store to search for annotation
instances. We assume here that the study/series UID stays the same
between the slide and annotation DICOM stores.
filter_by_annotation_iod: The annotation IOD to filter by. Default is
Microscopy-Bulk-Simple-Annotations.
https://dicom.nema.org/medical/dicom/current/output/chtml/part04/sect_b.5.html
filter_by_operator_id: The operator ID to filter by. This searches the
Operator Identification Sequence long code value for the provided ID.
https://dicom.innolitics.com/ciods/vl-whole-slide-microscopy-image/general-series/00081072/00401101/00080119
Returns:
An iterator that contains all of a slide's annotation instances.
"""
return self._find_annotation_instances(
self.levels,
annotation_dicom_store,
filter_by_annotation_iod,
filter_by_operator_id,
)
class DicomMicroscopeImage(_DicomSeries):
"""Represents a Non-tiled DICOM pathology slide stored in a DICOMStore."""
def __init__(
self,
dwi: dicom_web_interface.DicomWebInterface,
path: dicom_path.Path,
enable_client_slide_frame_decompression: bool = True,
accession_number: Optional[str] = None,
pixel_spacing_diff_tolerance: float = pixel_spacing_module.PIXEL_SPACING_DIFF_TOLERANCE,
logging_factory: Optional[
ez_wsi_logging_factory.AbstractLoggingInterfaceFactory
] = None,
slide_frame_cache: Optional[
local_dicom_slide_cache.InMemoryDicomSlideCache
] = None,
json_metadata: Union[str, Mapping[str, Any]] = '',
instances: Optional[Collection[dicom_web_interface.DicomObject]] = None,
):
"""Constructor.
Args:
dwi: The DicomWebInterface that has been configured to be able to access
the series referenced by the input path.
path: The path to a DICOM series object in a DICOMStore.
enable_client_slide_frame_decompression: Set to True to enable client side
frame decompression optimization (Recommended value = True); remove
parameter following GG validation.
accession_number: The accession_number of the slide.
pixel_spacing_diff_tolerance: The tolerance (percentage difference) for
difference between row and column pixel spacings.
logging_factory: The factory that EZ WSI uses to construct a logging
interface.
slide_frame_cache: Initialize to use shared slide cache.
json_metadata: Optional json formatted slide level metadata.
instances: Optional list of instances to use to initialize the slide.
"""
super().__init__(
dwi,
path,
enable_client_slide_frame_decompression,
accession_number,
logging_factory,
slide_frame_cache,
)
if json_metadata: # Ignore empty JSON metadata.
# if metadata is defined convert string to json.
if isinstance(json_metadata, str):
try:
json_metadata = json.loads(json_metadata)
except json.JSONDecodeError as exp:
raise ez_wsi_errors.InvalidSlideJsonMetadataError(
'Error decoding JSON metadata.'
) from exp
try:
if str(path) != str(
dicom_path.Path.from_dict(json_metadata[_SLIDE_PATH])
):
raise ez_wsi_errors.SlidePathDoesNotMatchJsonMetadataError(
'Slide path does not match slide path in json metadata.'
)
self._non_tiled_levels = (
slide_level_map.UntiledImageMap.create_from_json(
json_metadata[UNTILED_MICROSCOPE_IMAGE_MAP],
pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
)
)
except (TypeError, IndexError, KeyError) as exp:
raise ez_wsi_errors.InvalidSlideJsonMetadataError(
'Incorrectly formatted JSON metadata.'
) from exp
else:
self._non_tiled_levels = slide_level_map.UntiledImageMap(
dwi.get_instances(path) if instances is None else instances,
pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
)
def __copy__(self) -> DicomMicroscopeImage:
instance = DicomMicroscopeImage.__new__(DicomMicroscopeImage)
vars(instance).update(vars(self))
instance._non_tiled_levels = copy.copy(self._non_tiled_levels)
return instance
def json_metadata_dict(
self,
level_subset: Optional[List[slide_level_map.Level]] = None,
max_json_encoded_icc_profile_size: int = slide_level_map.DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES,
) -> Mapping[str, Any]:
"""Returns json formatted slide level metadata.
Args:
level_subset: List of levels defined on the slide to selectively export
metadata for.
max_json_encoded_icc_profile_size: Max size of JSON to return; not used.
Returns
Json formatted metadata.
Raises:
ez_wsi_errors.LevelNotFoundError: Level not defined on Slide.
"""
del max_json_encoded_icc_profile_size
return {
_SOP_CLASS_UID: slide_level_map.VL_MIROSCOPIC_IMAGE_SOP_CLASS_UID,
_SLIDE_PATH: self.path.to_dict(),
UNTILED_MICROSCOPE_IMAGE_MAP: self._non_tiled_levels.to_dict(
level_subset=level_subset,
),
}
def __eq__(self, other: Any) -> bool:
if isinstance(other, DicomMicroscopeImage):
return str(self.path) == str(other.path)
return False
def are_instances_concatenated(self, instance_uids: list[str]) -> bool:
del instance_uids
return False
@property
def levels(self) -> Iterator[slide_level_map.Level]:
"""Returns iterator that contains all of a slide's DICOM Levels."""
return iter(self._non_tiled_levels.untiled_image_map.values())
def get_level_by_index(
self, index: slide_level_map.LevelIndexType
) -> Optional[slide_level_map.Level]:
return self._non_tiled_levels.untiled_image_map.get(index, None)
@property
def all_levels(self) -> Iterator[slide_level_map.Level]:
return self.levels
def get_level_icc_profile_bytes(
self, level: Union[slide_level_map.Level, slide_level_map.ResizedLevel]
) -> bytes:
"""Returns ICC Profile bytes for pyramid."""
if isinstance(level, slide_level_map.ResizedLevel):
level = level.source_level
return self._non_tiled_levels.get_level_icc_profile_bytes(level, self._dwi)
def find_annotation_instances(
self,
level: slide_level_map.Level,
annotation_dicom_store: Optional[dicom_path.Path] = None,
filter_by_annotation_iod: Optional[
str
] = MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_STORAGE,
filter_by_operator_id: Optional[str] = None,
) -> Iterator[dicom_path.Path]:
"""Returns iterator that contains all of a slide's annotation instances.
The annotation instances much either reference the slide's series UID or
have the same series UID as the slide.
Args:
level: The image to search for annotation instances.
annotation_dicom_store: The DICOM store to search for annotation
instances. We assume here that the study/series UID stays the same
between the slide and annotation DICOM stores.
filter_by_annotation_iod: The annotation IOD to filter by. Default is
Microscopy-Bulk-Simple-Annotations.
https://dicom.nema.org/medical/dicom/current/output/chtml/part04/sect_b.5.html
filter_by_operator_id: The operator ID to filter by. This searches the
Operator Identification Sequence long code value for the provided ID.
https://dicom.innolitics.com/ciods/vl-whole-slide-microscopy-image/general-series/00081072/00401101/00080119
Returns:
An iterator that contains all of a slide's annotation instances.
"""
return self._find_annotation_instances(
iter([level]),
annotation_dicom_store,
filter_by_annotation_iod,
filter_by_operator_id,
)
def _add_sop_class_uid(
sop_class_uid: str, sop_class_uid_found: Set[str]
) -> None:
if (
sop_class_uid
== slide_level_map.VL_WHOLE_SLIDE_MICROSCOPY_IMAGE_SOP_CLASS_UID
or sop_class_uid in slide_level_map.UNTILED_IMAGE_SOP_CLASS_UID
):
sop_class_uid_found.add(sop_class_uid)
def _load_series(
dwi: dicom_web_interface.DicomWebInterface,
path: dicom_path.Path,
enable_client_slide_frame_decompression: bool,
json_metadata: Union[str, Mapping[str, Any]],
instances: Optional[Collection[dicom_web_interface.DicomObject]],
sop_class_uid_found: Set[str],
pixel_spacing_diff_tolerance: float,
logging_factory: Optional[
ez_wsi_logging_factory.AbstractLoggingInterfaceFactory
],
) -> Tuple[Optional[DicomSlide], Optional[DicomMicroscopeImage]]:
"""Loads DicomSlide and/or DicomMicroscopeImage defined on a series."""
if (
slide_level_map.VL_WHOLE_SLIDE_MICROSCOPY_IMAGE_SOP_CLASS_UID
in sop_class_uid_found
):
vl_whole_slide_image = DicomSlide(
dwi,
path,
enable_client_slide_frame_decompression,
json_metadata=json_metadata,
instances=instances,
pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
logging_factory=logging_factory,
)
else:
vl_whole_slide_image = None
if slide_level_map.UNTILED_IMAGE_SOP_CLASS_UID.intersection(
sop_class_uid_found
):
microscope_image = DicomMicroscopeImage(
dwi,
path,
enable_client_slide_frame_decompression,
json_metadata=json_metadata,
instances=instances,
pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
logging_factory=logging_factory,
)
else:
microscope_image = None
return (vl_whole_slide_image, microscope_image)
class DicomMicroscopeSeries:
"""Returns Microscope Images Defined on a Series."""
def __init__(
self,
dwi: dicom_web_interface.DicomWebInterface,
path: dicom_path.Path,
enable_client_slide_frame_decompression: bool = True,
json_metadata: Union[str, Mapping[str, Any]] = '',
instance_uids: Optional[list[str]] = None,
pixel_spacing_diff_tolerance: float = pixel_spacing_module.PIXEL_SPACING_DIFF_TOLERANCE,
logging_factory: Optional[
ez_wsi_logging_factory.AbstractLoggingInterfaceFactory
] = None,
):
if json_metadata:
sop_class_uid_found = set()
# if metadata is defined convert string to json.
try:
if isinstance(json_metadata, str):
json_metadata = json.loads(json_metadata)
sop_class_uid = json_metadata[_SOP_CLASS_UID]
_add_sop_class_uid(sop_class_uid, sop_class_uid_found)
except (json.JSONDecodeError, KeyError, TypeError, IndexError) as exp:
raise ez_wsi_errors.InvalidSlideJsonMetadataError(
'Error decoding JSON metadata.'
) from exp
self.dicom_slide, self.dicom_microscope_image = _load_series(
dwi,
path,
enable_client_slide_frame_decompression,
json_metadata,
None,
sop_class_uid_found,
pixel_spacing_diff_tolerance,
logging_factory,
)
if self.dicom_slide is None and self.dicom_microscope_image is None:
raise ez_wsi_errors.InvalidSlideJsonMetadataError(
'Error decoding JSON metadata does not encode DICOM imaging.'
)
return
dicom_instances = dwi.get_instances(path)
json_metadata = {}
sop_class_uid_found = set()
for instance in dicom_instances:
if (
instance_uids is None
or instance.get_value(tags.SOP_INSTANCE_UID) in instance_uids
):
_add_sop_class_uid(
instance.get_value(tags.SOP_CLASS_UID),
sop_class_uid_found,
)
self.dicom_slide, self.dicom_microscope_image = _load_series(
dwi,
path,
enable_client_slide_frame_decompression,
json_metadata,
dicom_instances,
sop_class_uid_found,
pixel_spacing_diff_tolerance,
logging_factory,
)