ez_wsi_dicomweb/slide_level_map.py (1,079 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""SlideLevelMap defines the relationship between frames and DICOM instances."""
from __future__ import annotations
import base64
import collections
import copy
import dataclasses
import enum
import io
import json
import math
import threading
import typing
from typing import Any, Collection, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
import cachetools
import dataclasses_json
from ez_wsi_dicomweb import dicom_web_interface
from ez_wsi_dicomweb import ez_wsi_errors
from ez_wsi_dicomweb import pixel_spacing as pixel_spacing_module
from ez_wsi_dicomweb.ml_toolkit import dicom_json
from ez_wsi_dicomweb.ml_toolkit import dicom_path
from ez_wsi_dicomweb.ml_toolkit import tags
import numpy as np
import pydicom
# Since the level distance is a log function, this max distance value allows
# a level that has a pixel spacing ratio within range of [2^-0.3, 2^0.3] to be
# matched as the closest level by get_level_by_pixel_spacing() function.
MAX_LEVEL_DIST = 0.23
_CONCATENATION_UID = tags.DicomTag(number='00209161', vr='UI')
# Pyramid ICC profiles are optimally serialized in JSON to avoid repeative
# re-initialization. However, some digital pathology DICOM, e.g. Leica, have
# very large ICC profiles, e.g., 12 MB. The default max size of the ICC profile
# controls the maximum size of the ICC profile serialized in JSON.
DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES = 204800
# https://dicom.nema.org/dicom/2013/output/chtml/part03/sect_A.32.html#sect_A.32.8
VL_MIROSCOPIC_IMAGE_SOP_CLASS_UID = '1.2.840.10008.5.1.4.1.1.77.1.2'
_VL_SLIDE_COORDINATES_MIROSCOPIC_IMAGE_SOP_CLASS_UID = (
'1.2.840.10008.5.1.4.1.1.77.1.3'
)
VL_WHOLE_SLIDE_MICROSCOPY_IMAGE_SOP_CLASS_UID = '1.2.840.10008.5.1.4.1.1.77.1.6'
UNTILED_IMAGE_SOP_CLASS_UID = frozenset([
VL_MIROSCOPIC_IMAGE_SOP_CLASS_UID,
_VL_SLIDE_COORDINATES_MIROSCOPIC_IMAGE_SOP_CLASS_UID,
])
_VOLUME = 'VOLUME'
_THUMBNAIL = 'THUMBNAIL'
_LABEL = 'LABEL'
_OVERVIEW = 'OVERVIEW'
_LABEL_OVERVIEW_THUMBNAIL_LEVEL_SET = frozenset([_THUMBNAIL, _LABEL, _OVERVIEW])
LevelIndexType = Union[str, int]
@dataclasses_json.dataclass_json
@dataclasses.dataclass(frozen=True)
class Instance:
"""Wrapper of a DICOM instance object, with details for frame lookup.
Used by the Level class to represent an instance in a specific
magnification level.
Attributes:
frame_offset: The frame index offset of the first possible frame from the
instance on the parent magnification level.
frame_count: Number of frames contained inside the instance.
dicom_object: The DicomObject of the instance.
path: Path to DICOM instance.
is_tiled_full: Returns true if instance is tiled full.
"""
frame_offset: int
frame_count: int
dicom_object: dicom_web_interface.DicomObject
@property
def path(self) -> dicom_path.Path:
return self.dicom_object.path
@property
def is_tiled_full(self) -> bool:
value = self.dicom_object.get_value(tags.DIMENSION_ORGANIZATION_TYPE)
if value is None or not isinstance(value, str):
return False
return value.upper() == 'TILED_FULL'
def instance_frame_number_from_wholes_slide_frame_number(
self, frame_number: int
) -> int:
"""Converts a frame_number to a frame_number within this instance.
Args:
frame_number: The frame_number to be converted from.
Returns:
The frame number within this instance.
"""
return frame_number - self.frame_offset
class ICCProfileMetadataState(enum.Enum):
UNINITIALIZED = 0
INITIALIZED = 1
@dataclasses_json.dataclass_json
@dataclasses.dataclass
class ICCProfileBytes:
icc_profile_bytes_as_b64_encoded_str: str = ''
metadata_state: ICCProfileMetadataState = (
ICCProfileMetadataState.UNINITIALIZED
)
@dataclasses.dataclass(frozen=True)
class ImageDimensions:
width_px: int
height_px: int
def copy(self) -> ImageDimensions:
return ImageDimensions(int(self.width_px), int(self.height_px))
class ResizedLevel:
"""DICOM pyramid level which is resized from existing level."""
def __init__(
self,
source_image_level: Level,
resized_level_dim: Union[
ImageDimensions, pixel_spacing_module.PixelSpacing
],
):
width_scale_factor, height_scale_factor = source_image_level.scale_factors(
resized_level_dim
)
if isinstance(resized_level_dim, pixel_spacing_module.PixelSpacing):
resized_level_dim = ImageDimensions(
max(1, int(source_image_level.width / width_scale_factor)),
max(1, int(source_image_level.height / height_scale_factor)),
)
self._source_image_level = source_image_level
self._resized_level_dim = resized_level_dim.copy()
if width_scale_factor >= 1.0 and height_scale_factor >= 1.0:
common_scale_factor = max(width_scale_factor, height_scale_factor)
elif width_scale_factor < 1.0 and height_scale_factor < 1.0:
common_scale_factor = min(width_scale_factor, height_scale_factor)
else:
common_scale_factor = 0
if (
common_scale_factor != 0
and int(source_image_level.height / common_scale_factor)
== self._resized_level_dim.height_px
and int(source_image_level.width / common_scale_factor)
== self._resized_level_dim.width_px
):
# If the larger resizing factor will result in the same dimensions
# then scale pixel spacing by common factor to keep pixels as a square
# as possible.
height_scale_factor = common_scale_factor
width_scale_factor = common_scale_factor
source_pixel_spacing = source_image_level.pixel_spacing
try:
self._resized_pixel_spacing = pixel_spacing_module.PixelSpacing(
source_pixel_spacing.column_spacing_mm * width_scale_factor,
source_pixel_spacing.row_spacing_mm * height_scale_factor,
source_pixel_spacing.mag_scaling_factor,
source_pixel_spacing.spacing_diff_tolerance,
)
except ez_wsi_errors.UndefinedPixelSpacingError:
# Source pixel spacing is undefined. Set resized imaging to
# undefined pixel spacing.
self._resized_pixel_spacing = source_pixel_spacing
self._width_scale_factor = width_scale_factor
self._height_scale_factor = height_scale_factor
def __eq__(self, other: Any) -> bool:
if not isinstance(other, ResizedLevel):
return False
return (
(
self._source_image_level is other._source_image_level
or self._source_image_level == other._source_image_level
)
and self._resized_level_dim == other._resized_level_dim
and self._resized_pixel_spacing == other._resized_pixel_spacing
and self._width_scale_factor == other._width_scale_factor
and self._height_scale_factor == other._height_scale_factor
)
@property
def source_level(self) -> Level:
return self._source_image_level
@property
def width(self) -> int:
return self._resized_level_dim.width_px
@property
def height(self) -> int:
return self._resized_level_dim.height_px
@property
def pixel_spacing(self) -> pixel_spacing_module.PixelSpacing:
return self._resized_pixel_spacing
def scale_factors(self) -> Tuple[float, float]:
"""Returns horizontal and vertical scale factors."""
return self._width_scale_factor, self._height_scale_factor
def _get_ps_from_level_or_ds_level(
level: Union[Level, ResizedLevel],
) -> pixel_spacing_module.PixelSpacing:
"""Returns pixel spacing class stored on Level or ResizedLevel."""
if isinstance(level, Level):
return typing.cast(Level, level).pixel_spacing
else:
return typing.cast(ResizedLevel, level).pixel_spacing
def _get_id_from_level_or_ds_level(
level: Union[Level, ResizedLevel],
) -> ImageDimensions:
"""Returns ImageDimensions of Level or ResizedLevel."""
if isinstance(level, Level):
level = typing.cast(Level, level)
else:
level = typing.cast(ResizedLevel, level)
return ImageDimensions(int(level.width), int(level.height))
@dataclasses_json.dataclass_json
@dataclasses.dataclass
class Level:
"""Represents the dimensions and instances of a specific magnification level.
Attributes:
level_index: The index of the level using 1-based indexing. A level with a
higher index corresponding to a lower magnification level.
width: The width, in number of pixels, of the entire level.
height: The height, in number of pixels, of the entire level.
samples_per_pixel: The number of samples per pixel. i.e. 3 for RGB format.
bits_allocated: The number of bits allocated for storing one pixel.
high_bit: The index of the highest bit in each sample of a pixel.
pixel_spacing: Pixel spacing of the slide level.
frame_width: The width, in pixels, of all frames on this level.
frame_height: The width, in pixels, of all frames on this level.
frame_number_min: The minimum frame number at this level.
frame_number_max: The maximum frame number at this level.
number_of_frames: Total number of frames at this level.
instances: All instances on this level, keyed by the frame_offset of the
instance. The instances should be ordered in the dictionary by the key.
transfer_syntax_uid: DICOM transfer syntax instances are encoded as.
tiled_full: Level is defined having tiled full organization.
"""
level_index: LevelIndexType
width: int
height: int
samples_per_pixel: int
bits_allocated: int
high_bit: int
pixel_spacing: pixel_spacing_module.PixelSpacing
frame_width: int
frame_height: int
frame_number_min: int
frame_number_max: int
instances: Dict[int, Instance]
transfer_syntax_uid: str
tiled_full: bool = dataclasses.field(init=False)
def __post_init__(self):
self.tiled_full = all([i.is_tiled_full for i in self.instances.values()])
@property
def number_of_frames(self) -> int:
return self.frame_number_max - self.frame_number_min + 1
@property
def pixel_format(self) -> np.dtype:
"""Gets the pixel format of the provided level.
Returns:
The pixel format of the level as numpy.dtype.
Raises:
UnsupportedPixelFormatError if pixel format is not supported.
"""
bytes_per_sample = math.ceil(self.bits_allocated / 8)
if bytes_per_sample == 1:
return np.uint8 # pytype: disable=bad-return-type # numpy-scalars
else:
raise ez_wsi_errors.UnsupportedPixelFormatError(
f'Pixel format not supported. BITS_ALLOCATED = {self.bits_allocated}'
)
def scale_factors(
self,
resized_level_dim: Union[
Level,
ResizedLevel,
ImageDimensions,
pixel_spacing_module.PixelSpacing,
],
) -> Tuple[float, float]:
"""Returns horizontal and vertical scale factors.
Args:
resized_level_dim: Level, ResizedLevel, ImageDimensions, or PixelSpacing
to calculate scaling factors.
Returns:
Tuple(Horizontal, Vertical) scaling factors necessary to convert parameter
dim to self dimensions.
Raises:
ez_wsi_errors.UndefinedPixelSpacingError: self does not have defined
pixel spacing and computing scaling factors based on pixel spacing or
if self has defined pixel spacing and the passed Level or
ResizedLevel does not.
"""
if isinstance(resized_level_dim, Level) or isinstance(
resized_level_dim, ResizedLevel
):
# If passed Level or ResizedLevel prefer to scale with Pixel Spacing
# scaling by pixel spacing will allow scale factors to be correctly
# computed between levels from different pyramids. Computing by dimension
# will not.
if self.pixel_spacing.is_defined:
resized_level_dim = _get_ps_from_level_or_ds_level(resized_level_dim)
else:
resized_level_dim = _get_id_from_level_or_ds_level(resized_level_dim)
if isinstance(resized_level_dim, pixel_spacing_module.PixelSpacing):
# Expected will raise ez_wsi_errors.UndefinedPixelSpacingError if
# passed pixel spacing and self does not define pixel spacing or
# Level or ResizedLevel does not contain defined pixel spacing.
sfx = (
resized_level_dim.column_spacing_mm
/ self.pixel_spacing.column_spacing_mm
)
sfy = resized_level_dim.row_spacing_mm / self.pixel_spacing.row_spacing_mm
else:
sfx = float(self.width) / float(resized_level_dim.width_px)
sfy = float(self.height) / float(resized_level_dim.height_px)
return sfx, sfy
def resize(
self,
resized_level_dim: Union[
Level,
ResizedLevel,
ImageDimensions,
pixel_spacing_module.PixelSpacing,
],
) -> Union[Level, ResizedLevel]:
"""resizes of self (level image) to pixel_spacing or dim of target.
Args:
resized_level_dim: Level, ResizedLevel, ImageDimensions, or PixelSpacing
to calculate scaling factors in relation to.
Returns:
Level if unchanged or ResizedLevel of self.
Raises:
ez_wsi_errors.UndefinedPixelSpacingError: self does not have defined
pixel spacing and computing scaling factors based on pixel spacing or
if self has defined pixel spacing and the passed Level or
ResizedLevel does not.
"""
if isinstance(resized_level_dim, Level) or isinstance(
resized_level_dim, ResizedLevel
):
if self.pixel_spacing.is_defined:
resized_level_dim = _get_ps_from_level_or_ds_level(resized_level_dim)
else:
resized_level_dim = _get_id_from_level_or_ds_level(resized_level_dim)
ds = ResizedLevel(self, resized_level_dim)
if ds.width == self.width and ds.height == self.height:
# if pixel dimensions of resized level are the return
# source image.
return self
return ds
def icc_profile_bulkdata_uri(self) -> str:
for instance in self.instances.values():
bulkdata_uri = instance.dicom_object.icc_profile_bulkdata_url
if bulkdata_uri:
return bulkdata_uri
return ''
def get_instance_by_frame(self, frame_number: int) -> Optional[Instance]:
"""Gets the instance that contains the requested frame.
Args:
frame_number: The frame number of the frame to request. The frame number
of a frame is stored in the DICOM tag
CONCATENATION_FRAME_OFFSET_NUMBER('00209228').
Returns:
The instance that contains the requested frame, or None if the input frame
is out of range of any instance.
"""
for frame_offset, instance in self.instances.items():
if frame_number <= frame_offset:
break
if frame_number <= frame_offset + instance.frame_count:
return instance
return None
def get_frame_number_by_point(self, x: int, y: int) -> int:
"""Gets the frame number corresponding to the input coordinate.
Args:
x: The x coordinate at the level the instance belongs to.
y: The y coordinate at the level the instance belongs to.
Returns:
The frame number that contains the input coordinate.
Raises:
CoordinateOutofImageDimensionsError if the input coordinate is out of the
range.
"""
if x < 0 or y < 0 or x >= self.width or y >= self.height:
raise ez_wsi_errors.CoordinateOutofImageDimensionsError(
f'The input coordinate {x, y} is out of the range: '
f'{0, 0, self.width - 1, self.height - 1}'
)
frame_x = int(x / self.frame_width)
frame_y = int(y / self.frame_height)
frames_per_row = int(math.ceil(float(self.width) / float(self.frame_width)))
return frame_y * frames_per_row + frame_x + 1
def get_frame_position(self, frame_number: int) -> Tuple[int, int]:
"""Gets the coordinate of the upper-left corner of the input frame.
Args:
frame_number: The input frame number to get the position of.
Returns:
The X and Y coordinates of the upper-left corner of the input frame.
Raises:
FrameNumberOutofBounds if the input frame number is out of the
range.
"""
if (
frame_number < self.frame_number_min
or frame_number > self.frame_number_max
):
raise ez_wsi_errors.FrameNumberOutofBoundsError(
f'The input frame number ({frame_number}) is out of the range: '
f'{self.frame_number_min, self.frame_number_max}'
)
frame_number = int(frame_number - 1)
frames_per_row = int(math.ceil(float(self.width) / float(self.frame_width)))
frame_x = frame_number % frames_per_row
frame_y = int(frame_number / frames_per_row)
return frame_x * self.frame_width, frame_y * self.frame_height
def get_instance_by_point(self, x: int, y: int) -> Optional[Instance]:
"""Gets the instance that contains the input point.
Args:
x: The x coordinate at the level the instance belongs to.
y: The y coordinate at the level the instance belongs to.
Returns:
The instance that contains the input point, or None if the point is out of
range of the level or is not within any instance.
"""
return self.get_instance_by_frame(self.get_frame_number_by_point(x, y))
@property
def instance_iterator(self) -> Iterator[Instance]:
"""Returns iterator of level's DICOM instances."""
return iter(self.instances.values())
def get_level_sop_instance_uids(self) -> List[str]:
return [
instance.dicom_object.get_value(tags.SOP_INSTANCE_UID)
for instance in self.instances.values()
]
def _wsi_dicom_pyramid_level(
dicom_object: dicom_web_interface.DicomObject,
level_index: LevelIndexType,
frame_number_min: int,
frame_number_max: int,
spacing_diff_tolerance: float,
instance: Optional[Instance] = None,
) -> Level:
instances = {} if instance is None else {instance.frame_offset: instance}
return Level(
level_index=level_index,
width=dicom_object.get_value(tags.TOTAL_PIXEL_MATRIX_COLUMNS),
height=dicom_object.get_value(tags.TOTAL_PIXEL_MATRIX_ROWS),
samples_per_pixel=dicom_object.get_value(tags.SAMPLES_PER_PIXEL),
bits_allocated=dicom_object.get_value(tags.BITS_ALLOCATED),
high_bit=dicom_object.get_value(tags.HIGH_BIT),
pixel_spacing=_get_pixel_spacing(dicom_object, spacing_diff_tolerance),
frame_width=dicom_object.get_value(tags.COLUMNS),
frame_height=dicom_object.get_value(tags.ROWS),
frame_number_min=frame_number_min,
frame_number_max=frame_number_max,
instances=instances,
transfer_syntax_uid=dicom_object.get_value(tags.TRANSFER_SYNTAX_UID),
)
def _create_wsi_label_thumbnail_or_overview_level(
level_index: str,
dicom_object: dicom_web_interface.DicomObject,
spacing_diff_tolerance: float,
) -> Level:
"""Creates a level to represent a label, thumbnail or overview image."""
frame_offset = (
dicom_object.get_value(tags.CONCATENATION_FRAME_OFFSET_NUMBER) or 0
)
if frame_offset != 0:
raise ez_wsi_errors.DicomSlideInitError(
'Label, thumbnail and overview images should have frame offset 0 or'
' have the tag unset.'
)
frame_count = dicom_object.get_value(tags.NUMBER_OF_FRAMES) or 1
if frame_count != 1:
raise ez_wsi_errors.DicomSlideInitError(
'Label, thumbnail and overview images should have frame count 1 or'
' have the tag unset.'
)
return _wsi_dicom_pyramid_level(
dicom_object,
level_index=level_index,
frame_number_min=1,
frame_number_max=1,
spacing_diff_tolerance=spacing_diff_tolerance,
instance=Instance(
frame_offset=0,
frame_count=1,
dicom_object=dicom_object,
),
)
class _LabelOverviewThumbnailImages:
"""Class to hold thumbnail, label and overview images associated with a WSI.
This class adheres to the DICOM standard's Whole Slide Microscopy Image
Flavors as define in table C.8.12.4-2.
"""
def __init__(self):
self.label: Optional[Level] = None
self.thumbnail: Optional[Level] = None
self.overview: Optional[Level] = None
def has_image(self) -> bool:
return (
self.label is not None
or self.thumbnail is not None
or self.overview is not None
)
def _get_key_val_map(self) -> Mapping[str, Optional[Level]]:
return {
_LABEL: self.label,
_THUMBNAIL: self.thumbnail,
_OVERVIEW: self.overview,
}
@classmethod
def _add_level(
cls, inst: _LabelOverviewThumbnailImages, image_type: str, level: Level
) -> None:
"""Adds a label, thumbnail or overview image.
Args:
inst: Instance of the _LabelOverviewThumbnailImages to add the image to.
image_type: The type of image being added.
level: The level(image) to add.
Raises:
ez_wsi_errors.DicomSlideInitError: Multiple images of the same type are
added.
"""
if image_type == _LABEL:
if inst.label is not None:
raise ez_wsi_errors.DicomSlideInitError(
'Slide contains multiple label images.'
)
inst.label = level
elif image_type == _THUMBNAIL:
if inst.thumbnail is not None:
raise ez_wsi_errors.DicomSlideInitError(
'Slide contains multiple thumbnail images.'
)
inst.thumbnail = level
elif image_type == _OVERVIEW:
if inst.overview is not None:
raise ez_wsi_errors.DicomSlideInitError(
'Slide contains multiple overview images.'
)
inst.overview = level
else:
raise ez_wsi_errors.DicomSlideInitError(
f'Unknown image type: {image_type}'
)
@classmethod
def from_dict(
cls, level_map: Optional[Mapping[str, Any]]
) -> _LabelOverviewThumbnailImages:
"""Converts dict representation to _LabelOverviewThumbnailImages."""
images = _LabelOverviewThumbnailImages()
if level_map is None:
return images
for image_type in _LABEL_OVERVIEW_THUMBNAIL_LEVEL_SET:
level_dict = level_map.get(image_type)
if level_dict is None:
continue
_LabelOverviewThumbnailImages._add_level(
images, image_type, Level.from_dict(level_dict)
)
return images
def to_dict(self) -> Mapping[str, Any]:
"""Returns dict representation of _LabelOverviewThumbnailImages."""
return {
key: val.to_dict()
for key, val in self._get_key_val_map().items()
if val is not None
}
@classmethod
def _get_level_index(cls, image_type: str) -> str:
if image_type in _LABEL_OVERVIEW_THUMBNAIL_LEVEL_SET:
return image_type
raise ez_wsi_errors.DicomSlideInitError(f'Unknown image type: {image_type}')
def get_level_by_index(self, level_index: str) -> Optional[Level]:
try:
return self._get_key_val_map().get(
_LabelOverviewThumbnailImages._get_level_index(level_index)
)
except ez_wsi_errors.DicomSlideInitError:
return None
def add_image(
self,
image_type: str,
dicom_object: dicom_web_interface.DicomObject,
pixel_spacing_diff_tolerance: float,
):
"""Adds dicom instance to describing thumbnail, label or overview image.
Args:
image_type: The type of image being added.
dicom_object: The dicom instance to add.
pixel_spacing_diff_tolerance: The tolerance (percentage difference) for
difference between row and column pixel spacings.
Raises:
ez_wsi_errors.DicomSlideInitError: Multiple images of the same type are
added or imaging being added is not a label, thumbnail or overview image.
"""
level = _create_wsi_label_thumbnail_or_overview_level(
_LabelOverviewThumbnailImages._get_level_index(image_type),
dicom_object,
pixel_spacing_diff_tolerance,
)
_LabelOverviewThumbnailImages._add_level(self, image_type, level)
def _order_untiled_level_map(
levels: Mapping[str, Level],
) -> Mapping[str, Level]:
sorted_levels = collections.OrderedDict()
for key in sorted(list(levels)):
sorted_levels[key] = levels[key]
return sorted_levels
def _order_wsi_level_map(levels: Mapping[Any, Level]) -> Mapping[int, Level]:
"""Level map sorted levels based on level index instance frame offset."""
# Sort levels based on level index.
sorted_levels = collections.OrderedDict()
levels_sorted_by_pixel_area = sorted(
[(level, level.height * level.width) for level in levels.values()],
key=lambda x: x[1],
reverse=True,
)
for index, sorted_level in enumerate(levels_sorted_by_pixel_area):
level, _ = sorted_level
# Sort instances in each level based on frame_offset
level = dataclasses.replace(level, level_index=index + 1)
instances = level.instances
sorted_instances = collections.OrderedDict()
instance_offsets = sorted(list(instances.keys()))
for frame_offset in instance_offsets:
sorted_instances[frame_offset] = instances[frame_offset]
frame_number_min = instance_offsets[0] + 1
max_frame_offset = instance_offsets[-1]
frame_number_max = (
max_frame_offset + sorted_instances[max_frame_offset].frame_count
)
sorted_levels[level.level_index] = dataclasses.replace(
level,
frame_number_max=frame_number_max,
frame_number_min=frame_number_min,
instances=sorted_instances,
)
return sorted_levels
@dataclasses.dataclass(frozen=True)
class PyramidLevelsWithClosestPixelSpacing:
equal_or_smaller_pixel_spacing: Optional[Level]
greater_pixel_spacing: Optional[Level]
def _are_pixelspacing_equal(
level: Level, pixel_spacing_mm: float, rel_tol: float
) -> bool:
return math.isclose(
level.pixel_spacing.pixel_spacing_mm, pixel_spacing_mm, rel_tol=rel_tol
)
def _wsi_pyramid_level_key(
dicom_object: dicom_web_interface.DicomObject,
) -> str:
concatenation_uid = dicom_object.get_value(_CONCATENATION_UID)
if concatenation_uid is not None:
return concatenation_uid
return dicom_object.get_value(tags.SOP_INSTANCE_UID)
def _validate_dicom_instance_defines_ez_wsi_required_tags_for_untiled_imaging(
dicom_object: dicom_web_interface.DicomObject,
):
_check_for_tag_or_raise(dicom_object, tags.SOP_INSTANCE_UID, True)
_check_for_tag_or_raise(dicom_object, tags.COLUMNS, True)
_check_for_tag_or_raise(dicom_object, tags.ROWS, True)
_check_for_tag_or_raise(dicom_object, tags.SAMPLES_PER_PIXEL, True)
_check_for_tag_or_raise(dicom_object, tags.BITS_ALLOCATED, True)
_check_for_tag_or_raise(dicom_object, tags.HIGH_BIT, True)
_check_for_tag_or_raise(dicom_object, tags.TRANSFER_SYNTAX_UID, True)
def _validate_dicom_instance_defines_ez_wsi_required_tags_for_wsi_imaging(
dicom_object: dicom_web_interface.DicomObject,
image_type: List[str],
):
"""Validates that the input DICOM instance defines all required tags."""
# Makes sure the following tags have defined values in dicom_object.
_validate_dicom_instance_defines_ez_wsi_required_tags_for_untiled_imaging(
dicom_object
)
_check_for_tag_or_raise(dicom_object, tags.TOTAL_PIXEL_MATRIX_COLUMNS, True)
_check_for_tag_or_raise(dicom_object, tags.TOTAL_PIXEL_MATRIX_ROWS, True)
if image_type is not None and _VOLUME in image_type:
_check_for_tag_or_raise(dicom_object, tags.IMAGE_VOLUME_WIDTH, True)
_check_for_tag_or_raise(dicom_object, tags.IMAGE_VOLUME_HEIGHT, True)
def _get_pixel_spacing(
dicom_object: dicom_web_interface.DicomObject, spacing_diff_tolerance: float
) -> pixel_spacing_module.PixelSpacing:
"""Returns pixel spacing from dicom_object.
Args:
dicom_object: The dicom object to get pixel spacing from.
spacing_diff_tolerance: Pixel spacing difference tolerance.
Returns
Tuple [column spacing, row spacing]
"""
try:
column_spacing = dicom_object.get_value(
tags.IMAGE_VOLUME_WIDTH
) / dicom_object.get_value(tags.TOTAL_PIXEL_MATRIX_COLUMNS)
row_spacing = dicom_object.get_value(
tags.IMAGE_VOLUME_HEIGHT
) / dicom_object.get_value(tags.TOTAL_PIXEL_MATRIX_ROWS)
return pixel_spacing_module.PixelSpacing(
column_spacing,
row_spacing,
spacing_diff_tolerance=spacing_diff_tolerance,
)
except TypeError:
pass
try:
shared_functional_groups = dicom_object.get_list_value(
tags.SHARED_FUNCTIONAL_GROUP_SEQUENCE
)
pixel_measure_sequence = dicom_json.GetList(
shared_functional_groups[0], tags.PIXEL_MEASURES_SEQUENCE
)
spacing = dicom_json.GetList(pixel_measure_sequence[0], tags.PIXEL_SPACING)
row_spacing, column_spacing = spacing
return pixel_spacing_module.PixelSpacing(
column_spacing,
row_spacing,
spacing_diff_tolerance=spacing_diff_tolerance,
)
except (KeyError, ValueError, IndexError, TypeError) as _:
return pixel_spacing_module.UndefinedPixelSpacing(
spacing_diff_tolerance=spacing_diff_tolerance,
)
def _get_ancillary_image_type(image_type: frozenset[str]) -> str:
for im_type in _LABEL_OVERVIEW_THUMBNAIL_LEVEL_SET:
if im_type in image_type:
return im_type
return ''
def _check_for_tag_or_raise(
dicom_object: dicom_web_interface.DicomObject,
tag: tags.DicomTag,
check_no_zero: bool = False,
):
"""Check for the existence of a DICOM tag in a DICOM object.
Args:
dicom_object: The host DICOM object to check against.
tag: The DICOM tag to check for.
check_no_zero: If enabled (true), this method will check if the value of the
tag is non-zero.
Raises:
DicomTagNotFoundError: If the tag does not exist in the object.
InvalidDicomTagError: If check_no_zero is enabled and the tag has zero
value.
"""
value = dicom_object.get_value(tag)
if value is None:
raise ez_wsi_errors.DicomTagNotFoundError(
f'DICOM tag {tag.number} is missing from the DICOM object: '
f'{str(dicom_object.path)}'
)
if check_no_zero and value == 0:
raise ez_wsi_errors.InvalidDicomTagError(
f'DICOM tag {tag.number} cannot have zero value.'
)
def _get_dicom_icc_profile_bytes(dcm: pydicom.Dataset) -> bytes:
if 'OpticalPathSequence' in dcm:
for dataset in dcm.OpticalPathSequence:
if 'ICCProfile' in dataset:
return dataset.ICCProfile
if 'ICCProfile' in dcm:
return dcm.ICCProfile
return b''
def _get_level_icc_profile_bytes(
path: dicom_path.Path,
dwi: dicom_web_interface.DicomWebInterface,
) -> bytes:
"""Returns ICC profile bytes for a pyramid level."""
with io.BytesIO() as dicom_bytes:
dwi.download_instance_untranscoded(
path,
dicom_bytes,
)
dicom_bytes.seek(0)
try:
with pydicom.dcmread(dicom_bytes) as dcm:
return _get_dicom_icc_profile_bytes(dcm)
except pydicom.errors.InvalidDicomError as exp:
raise ez_wsi_errors.DicomInstanceReadError(
'Error reading DICOM instance.'
) from exp
class SlideLevelMap:
"""A class that builds and maintains the level-instance-frame mapping.
When converting a WSI slide to DICOM format, a single WSI slide gets mapped to
one DICOM series, which stores images at multiple levels of magnification with
multiple DICOM instances.
At each level of magnification, the image is divided into multiple frames with
a fixed frame size(i.e. 500).
DICOM tries to store all frames at a magnification level into one DICOM
instance. If the number of frames at that level exceeds the limit, which is
typically 2048, multiple instances are used. In that case, the index of the
first possible frame within an instance is tagged as the frame_offset of that
instance.
"""
def __init__(
self,
dicom_objects: Collection[dicom_web_interface.DicomObject],
pixel_spacing_diff_tolerance: float = pixel_spacing_module.PIXEL_SPACING_DIFF_TOLERANCE,
):
"""Constructor.
Args:
dicom_objects: DICOM objects of all instances within a DICOM series.
pixel_spacing_diff_tolerance: The tolerance (percentage difference) for
difference between row and column pixel spacings.
Raises:
UnexpectedDicomObjectInstanceError: If any of the input objects is not a
DICOM instance object
NoDicomLevelsDetectedError: No level was detected in the Dicom object.
DicomTagNotFoundError: If any of the required tags is missing from any
DICOM object.
InvalidDicomTagError: If check_no_zero is enabled and the tag has zero
value.
SlideLevelContainsInstancesWithDifferentTransferSyntaxUIDError: Slide
level contains instances with different transfer syntaxes.
"""
self._slide_metadata_lock = threading.RLock()
self._icc_profile_bytes = ICCProfileBytes()
self._spacing_diff_tolerance = pixel_spacing_diff_tolerance
self._level_map, self._label_overview_thumbnail_images = (
self._build_level_map(dicom_objects)
)
self._smallest_level_path = self._get_smallest_level_path()
level_index_list = list(self._level_map)
self.level_index_min = level_index_list[0] if level_index_list else None
self.level_index_max = level_index_list[-1] if level_index_list else None
@classmethod
def create_from_json(
cls,
json_str: Union[str, Mapping[str, Any]],
pixel_spacing_diff_tolerance: float = pixel_spacing_module.PIXEL_SPACING_DIFF_TOLERANCE,
) -> SlideLevelMap:
"""Creates an SlideLevelMap from a JSON string or dict."""
if not json_str:
return SlideLevelMap([], pixel_spacing_diff_tolerance)
instance = SlideLevelMap.__new__(SlideLevelMap)
instance._slide_metadata_lock = threading.RLock()
instance._icc_profile_bytes = ICCProfileBytes()
instance._spacing_diff_tolerance = pixel_spacing_diff_tolerance
slide_map_metadata = (
json.loads(json_str) if isinstance(json_str, str) else json_str
)
level_map = slide_map_metadata['level_map']
instance._icc_profile_bytes = ICCProfileBytes.from_json(
json.dumps(slide_map_metadata['icc_profile'])
)
instance._label_overview_thumbnail_images = (
_LabelOverviewThumbnailImages.from_dict(
slide_map_metadata['label_overview_thumbnail_images']
)
)
instance._level_map = _order_wsi_level_map(
{int(key): Level.from_dict(level_map[key]) for key in level_map}
)
smallest_level_path = slide_map_metadata['smallest_level_path']
instance._smallest_level_path = (
dicom_path.Path.from_dict(smallest_level_path)
if smallest_level_path
else None
)
level_index_list = list(instance._level_map)
instance.level_index_min = level_index_list[0] if level_index_list else None
instance.level_index_max = (
level_index_list[-1] if level_index_list else None
)
return instance
def __getstate__(self) -> MutableMapping[str, Any]:
"""Returns class state for pickle serialization."""
state = copy.copy(self.__dict__)
del state['_slide_metadata_lock']
return state
def __setstate__(self, dct: MutableMapping[str, Any]) -> None:
"""Init class state from pickle serialization."""
self.__dict__ = dct
self._slide_metadata_lock = threading.RLock()
def _add_wsi_dicom_instance(
self,
levels: MutableMapping[str, Level],
level_instances: MutableMapping[str, MutableMapping[int, Instance]],
label_overview_thumbnail_images: _LabelOverviewThumbnailImages,
dicom_object: dicom_web_interface.DicomObject,
) -> None:
"""Adds a level to represent a wsi pyramid image."""
_check_for_tag_or_raise(dicom_object, tags.IMAGE_TYPE)
image_type = dicom_object.get_list_value(tags.IMAGE_TYPE)
_validate_dicom_instance_defines_ez_wsi_required_tags_for_wsi_imaging(
dicom_object, image_type
)
if image_type is not None and image_type:
image_type = _get_ancillary_image_type(frozenset(image_type))
if image_type:
label_overview_thumbnail_images.add_image(
image_type, dicom_object, self._spacing_diff_tolerance
)
return
level_index = _wsi_pyramid_level_key(dicom_object)
level_index_data = levels.get(level_index)
if level_index_data is None:
levels[level_index] = _wsi_dicom_pyramid_level(
dicom_object,
level_index='undefined',
frame_number_min=-1,
frame_number_max=-1,
spacing_diff_tolerance=self._spacing_diff_tolerance,
)
elif dicom_object.get_value(_CONCATENATION_UID) is None:
raise ez_wsi_errors.DicomSlideInitError(
'Series contains multiple instances with the same SOP instance UID.'
)
elif level_index_data.transfer_syntax_uid != dicom_object.get_value(
tags.TRANSFER_SYNTAX_UID
):
raise ez_wsi_errors.SlideLevelContainsInstancesWithDifferentTransferSyntaxUIDError()
frame_offset = (
dicom_object.get_value(tags.CONCATENATION_FRAME_OFFSET_NUMBER) or 0
)
level_instances[level_index][frame_offset] = Instance(
frame_offset=frame_offset,
frame_count=dicom_object.get_value(tags.NUMBER_OF_FRAMES) or 1,
dicom_object=dicom_object,
)
def _build_level_map(
self,
dicom_objects: Collection[dicom_web_interface.DicomObject],
) -> Tuple[Mapping[int, Level], _LabelOverviewThumbnailImages]:
"""Returns a slide level map built from a set of input DICOM instances.
The algorithm groups instances in multiple sets by level. The result is
stored
in a ordered dictionary, using the level index as the key
Within each level, all instances belonging to that level are stored into a
dictionary, using the frame offset of the instance as the key:
Dict[frame_offset, Instance]
Both the level dictionary and the instance dictionary need to be accessed in
order by the key values. To avoid sorting the keys on-the-fly, we sort
both dictionaries beforehand.
Args:
dicom_objects: DICOM objects to use to build the SlideLevelMap.
Returns:
A sorted dict that contains all pyramid levels of the DICOM object.
_LabelOverviewThumbnailImages
Raises:
UnexpectedDicomObjectInstanceError: If any of the input objects is not a
DICOM instance object.
NoDicomLevelsDetectedError: No level was detected in the Dicom object.
DicomTagNotFoundError: If any of the required tags is missing from any
DICOM
object.
InvalidDicomTagError: If check_no_zero is enabled and the tag has zero
value.
SlideLevelContainsInstancesWithDifferentTransferSyntaxUIDError: Slide
level
contains instances with different transfer syntaxes.
"""
label_overview_thumbnail_images = _LabelOverviewThumbnailImages()
levels = {}
level_instances = collections.defaultdict(dict)
for dicom_object in dicom_objects:
if dicom_object.type() != dicom_path.Type.INSTANCE:
raise ez_wsi_errors.UnexpectedDicomObjectInstanceError(
'SlideLevelMap expects all input DicomObject to have a type of '
f'INSTANCE. Actual: {dicom_object.type()}'
)
_check_for_tag_or_raise(dicom_object, tags.SOP_CLASS_UID)
sop_class_uid = dicom_object.get_value(tags.SOP_CLASS_UID)
if sop_class_uid != VL_WHOLE_SLIDE_MICROSCOPY_IMAGE_SOP_CLASS_UID:
continue
self._add_wsi_dicom_instance(
levels, level_instances, label_overview_thumbnail_images, dicom_object
)
if not levels and not label_overview_thumbnail_images.has_image():
raise ez_wsi_errors.NoDicomLevelsDetectedError(
f'No level detected from the input DICOM objects {dicom_objects}'
)
for level_index in levels:
levels[level_index] = dataclasses.replace(
levels[level_index], instances=level_instances[level_index]
)
return (_order_wsi_level_map(levels), label_overview_thumbnail_images)
def set_icc_profile_bytes(self, icc_profile_bytes: bytes) -> None:
with self._slide_metadata_lock:
self._icc_profile_bytes.icc_profile_bytes_as_b64_encoded_str = (
base64.b64encode(icc_profile_bytes).decode('utf-8')
)
self._icc_profile_bytes.metadata_state = (
ICCProfileMetadataState.INITIALIZED
)
def is_icc_profile_initialized(self) -> bool:
with self._slide_metadata_lock:
return (
self._icc_profile_bytes.metadata_state
== ICCProfileMetadataState.INITIALIZED
)
def get_icc_profile_bytes(
self,
dwi: dicom_web_interface.DicomWebInterface,
) -> bytes:
"""Returns ICC profile for bytes for a pyramid level."""
with self._slide_metadata_lock:
if self.is_icc_profile_initialized():
return base64.b64decode(
self._icc_profile_bytes.icc_profile_bytes_as_b64_encoded_str.encode(
'utf-8'
)
)
for level in self._level_map.values():
uri = level.icc_profile_bulkdata_uri()
if uri:
icc_profile_bytes = dwi.get_bulkdata(uri)
self.set_icc_profile_bytes(icc_profile_bytes)
return icc_profile_bytes
if self._smallest_level_path is None:
icc_profile_bytes = b''
else:
icc_profile_bytes = _get_level_icc_profile_bytes(
self._smallest_level_path, dwi
)
self.set_icc_profile_bytes(icc_profile_bytes)
return icc_profile_bytes
def _get_smallest_level_path(self) -> Optional[dicom_path.Path]:
"""Returns pyramid_level with least number of frames."""
smallest_level = None
smallest_level_total_pixels = None
for slide_level in self.level_map.values():
frame_count = (
1 + slide_level.frame_number_max - slide_level.frame_number_min
)
frame_pixels = slide_level.frame_width * slide_level.frame_height
total_pixels = frame_count * frame_pixels
if total_pixels <= 0:
continue
if smallest_level is None or smallest_level_total_pixels > total_pixels:
smallest_level = slide_level
smallest_level_total_pixels = total_pixels
if smallest_level is None:
return None
return smallest_level.instances[0].dicom_object.path
def get_json_encoded_icc_profile_size(self) -> int:
with self._slide_metadata_lock:
return len(self._icc_profile_bytes.to_json())
def to_dict(
self,
level_subset: Optional[List[Level]] = None,
max_json_encoded_icc_profile_size: int = DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES,
) -> MutableMapping[str, Any]:
"""Converts pyramid level map into Dictionary."""
with self._slide_metadata_lock:
level_map = {}
for key, slide_level in self._level_map.items():
if level_subset is None or slide_level in level_subset:
level_map[key] = slide_level.to_dict()
if level_subset is not None:
levels_found_count = len(level_map)
# count levels included in ancillary images
for non_pyramid_level in [self.label, self.overview, self.thumbnail]:
if (
non_pyramid_level is not None
and non_pyramid_level in level_subset
):
levels_found_count += 1
if len(level_subset) != levels_found_count:
raise ez_wsi_errors.LevelNotFoundError('Levels not found in slide.')
icc_profile_json = self._icc_profile_bytes.to_json()
if (
self.get_json_encoded_icc_profile_size()
> max_json_encoded_icc_profile_size
):
# Do not encode large icc profile that could exceed size limts for
# VertexAI (1.5 MB payload limit); icc_profile for Leica WSI can
# exceed 12 MB. If large set value to uninitialized to force decoder to
# re-init icc profile.
icc_profile_json = ICCProfileBytes().to_json()
return {
'level_map': level_map,
'label_overview_thumbnail_images': (
self._label_overview_thumbnail_images.to_dict()
),
'smallest_level_path': (
self._smallest_level_path.to_dict()
if self._smallest_level_path is not None
else {}
),
'icc_profile': json.loads(icc_profile_json),
}
def to_json(
self,
level_subset: Optional[List[Level]] = None,
max_json_encoded_icc_profile_size: int = DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES,
) -> str:
"""Converts pyramid level map into JSON."""
return json.dumps(
self.to_dict(level_subset, max_json_encoded_icc_profile_size)
)
@property
def level_map(self) -> Mapping[int, Level]:
"""Returns an ordered level map built from a set of input DICOM instances.
The algorithm groups instances in multiple sets by level. The result is
stored in a dictionary, using the level index as the key:
Dict[level_index, Level]
"""
return self._level_map
@property
def thumbnail(self) -> Optional[Level]:
return self._label_overview_thumbnail_images.thumbnail
@property
def overview(self) -> Optional[Level]:
return self._label_overview_thumbnail_images.overview
@property
def label(self) -> Optional[Level]:
return self._label_overview_thumbnail_images.label
def get_level(self, level_index: LevelIndexType) -> Optional[Level]:
"""Returns the level by requested level index.
Args:
level_index: The level index to be required. Level index usually starts at
1.
"""
level = self._level_map.get(level_index)
if level is not None:
return level
return self._label_overview_thumbnail_images.get_level_by_index(level_index)
def are_instances_concatenated(self, instance_uids: List[str]) -> bool:
"""Determines whether all instances in the list are concatenated.
Args:
instance_uids: A list of SOP Instance UIDs to check
Returns:
True if the instances are concatenated or if only one or fewer instance
uids are provided. Otherwise returns False.
"""
instance_uids_set = set(instance_uids)
instance_uid_to_concat_id = {}
for level in self._level_map.values():
for instance in level.instances.values():
if (
instance.dicom_object.get_value(tags.SOP_INSTANCE_UID)
in instance_uids_set
):
if not instance.dicom_object.get_value(_CONCATENATION_UID):
return False
instance_uid_to_concat_id[
instance.dicom_object.get_value(tags.SOP_INSTANCE_UID)
] = instance.dicom_object.get_value(_CONCATENATION_UID)
return (instance_uid_to_concat_id.keys() == instance_uids_set) and (
len(set(instance_uid_to_concat_id.values())) == 1
)
def get_instance_pixel_spacing(
self, instance_uid: str
) -> Optional[pixel_spacing_module.PixelSpacing]:
"""Given an Instance UID retrieves its corresponding PixelSpacing.
Args:
instance_uid: A SOP Instance UID to find
Returns:
PixelSpacing of the instance. Raises if no match
Raises:
PixelSpacingNotFoundForInstanceError if the instance is not in the level
map.
"""
for level in self._level_map.values():
for instance in level.instances.values():
if (
instance.dicom_object.get_value(tags.SOP_INSTANCE_UID)
== instance_uid
):
return level.pixel_spacing
raise ez_wsi_errors.PixelSpacingNotFoundForInstanceError(
f'Instance UID: {instance_uid} did not match any in the level map.'
)
def get_levels_with_closest_pixel_spacing(
self,
pixel_spacing_mm: float,
) -> PyramidLevelsWithClosestPixelSpacing:
"""Returns the level that has the closest pixel spacing as the input.
Args:
pixel_spacing_mm: Pixel spacing of desired imaging.
Returns:
The level that most matches with the requested pixel spacing.
"""
equal_or_smaller_pixel_spacing = None
greater_pixel_spacing = None
# Finds the one or two closest levels to the desired level.
for level in self._level_map.values():
try:
level_spacing = level.pixel_spacing.pixel_spacing_mm
if level_spacing > pixel_spacing_mm:
if (
greater_pixel_spacing is None
or greater_pixel_spacing.pixel_spacing.pixel_spacing_mm
> level_spacing
):
greater_pixel_spacing = level
else:
if (
equal_or_smaller_pixel_spacing is None
or equal_or_smaller_pixel_spacing.pixel_spacing.pixel_spacing_mm
< level_spacing
):
equal_or_smaller_pixel_spacing = level
except ez_wsi_errors.UndefinedPixelSpacingError:
continue
return PyramidLevelsWithClosestPixelSpacing(
equal_or_smaller_pixel_spacing, greater_pixel_spacing
)
def get_level_by_pixel_spacing(
self,
pixel_spacing_mm: float,
relative_pixel_spacing_equality_threshold: float = MAX_LEVEL_DIST,
maximum_downsample: float = 0.0,
) -> Optional[Level]:
"""Returns the level that has the closest pixel spacing as the input.
Args:
pixel_spacing_mm: Pixel spacing of desired imaging.
relative_pixel_spacing_equality_threshold: Maximum relative difference in
(pyramid / level pixel spacing / desired pixel spacing) at which pixel
spacing should be considered equilvalent. E.g., (value of 0.25 =
pyramid levels with pixels spacings that are 25% smaller - 25% larger
are considered equlivalent).
maximum_downsample: Maximum degree to which it is acceptable to downsample
pixel imaging to acchieve a desired target pixel spacing. Only used when
source imaging pixel spacing is < and != to desired target pixel
spacing.
Returns:
The level that closely matches with the requestd pixel spacing, or None
if the requested pixel spacing is out range of any existing levels.
"""
pyramid_levels = self.get_levels_with_closest_pixel_spacing(
pixel_spacing_mm
)
closest_smaller_level = pyramid_levels.equal_or_smaller_pixel_spacing
closest_greater_level = pyramid_levels.greater_pixel_spacing
if closest_smaller_level is not None and _are_pixelspacing_equal(
closest_smaller_level,
pixel_spacing_mm,
relative_pixel_spacing_equality_threshold,
):
return closest_smaller_level
if closest_greater_level is not None and _are_pixelspacing_equal(
closest_greater_level,
pixel_spacing_mm,
relative_pixel_spacing_equality_threshold,
):
return closest_greater_level
if (
closest_smaller_level is None
or pixel_spacing_mm
/ closest_smaller_level.pixel_spacing.pixel_spacing_mm
> maximum_downsample
):
return None
return closest_smaller_level
class UntiledImageMap:
"""A class that builds and maintains the level-instance-frame mapping.
When converting a WSI slide to DICOM format, a single WSI slide gets mapped to
one DICOM series, which stores images at multiple levels of magnification with
multiple DICOM instances.
At each level of magnification, the image is divided into multiple frames with
a fixed frame size(i.e. 500).
DICOM tries to store all frames at a magnification level into one DICOM
instance. If the number of frames at that level exceeds the limit, which is
typically 2048, multiple instances are used. In that case, the index of the
first possible frame within an instance is tagged as the frame_offset of that
instance.
"""
def __init__(
self,
dicom_objects: Collection[dicom_web_interface.DicomObject],
pixel_spacing_diff_tolerance: float = pixel_spacing_module.PIXEL_SPACING_DIFF_TOLERANCE,
):
"""Constructor.
Args:
dicom_objects: DICOM objects of all instances within a DICOM series.
pixel_spacing_diff_tolerance: The tolerance (percentage difference) for
difference between row and column pixel spacings.
Raises:
UnexpectedDicomObjectInstanceError: If any of the input objects is not a
DICOM instance object
NoDicomLevelsDetectedError: No level was detected in the Dicom object.
DicomTagNotFoundError: If any of the required tags is missing from any
DICOM object.
InvalidDicomTagError: If check_no_zero is enabled and the tag has zero
value.
SlideLevelContainsInstancesWithDifferentTransferSyntaxUIDError: Slide
level contains instances with different transfer syntaxes.
"""
self._slide_metadata_lock = threading.Lock()
self._pixel_spacing_diff_tolerance = pixel_spacing_diff_tolerance
self._icc_profile_level_bytes_cache = cachetools.LRUCache(maxsize=10)
self._untiled_image_map = self._build_untiled_image_map(dicom_objects)
@classmethod
def create_from_json(
cls,
json_str: Union[str, Mapping[str, Any]],
pixel_spacing_diff_tolerance: float = pixel_spacing_module.PIXEL_SPACING_DIFF_TOLERANCE,
) -> UntiledImageMap:
"""Creates an UntiledImageMap from a JSON string or dict."""
if not json_str:
return UntiledImageMap([], pixel_spacing_diff_tolerance)
instance = UntiledImageMap.__new__(UntiledImageMap)
instance._slide_metadata_lock = threading.Lock()
instance._pixel_spacing_diff_tolerance = pixel_spacing_diff_tolerance
instance._icc_profile_level_bytes_cache = cachetools.LRUCache(maxsize=10)
slide_map_metadata = (
json.loads(json_str) if isinstance(json_str, str) else json_str
)
untiled_image_map = slide_map_metadata['untiled_image_map']
instance._untiled_image_map = _order_untiled_level_map({
key: Level.from_dict(value) for key, value in untiled_image_map.items()
})
return instance
def __getstate__(self) -> MutableMapping[str, Any]:
"""Returns class state for pickle serialization."""
state = copy.copy(self.__dict__)
del state['_slide_metadata_lock']
return state
def __setstate__(self, dct: MutableMapping[str, Any]) -> None:
"""Init class state from pickle serialization."""
self.__dict__ = dct
self._slide_metadata_lock = threading.Lock()
def _create_untiled_image_level(
self, dicom_object: dicom_web_interface.DicomObject
):
"""Adds a level to represent an untiled image."""
frame_offset = (
dicom_object.get_value(tags.CONCATENATION_FRAME_OFFSET_NUMBER) or 0
)
if frame_offset != 0:
raise ez_wsi_errors.DicomSlideInitError(
'Label, thumbnail and overview images should have frame offset 0 or'
' have the tag unset.'
)
frame_count = dicom_object.get_value(tags.NUMBER_OF_FRAMES) or 1
if frame_count != 1:
raise ez_wsi_errors.DicomSlideInitError(
'Label, thumbnail and overview images should have frame count 1 or'
' have the tag unset.'
)
pixel_spacing = dicom_object.get_list_value(tags.PIXEL_SPACING)
if pixel_spacing is not None and pixel_spacing and len(pixel_spacing) == 2:
pixel_spacing = pixel_spacing_module.PixelSpacing(
pixel_spacing[1],
pixel_spacing[0],
spacing_diff_tolerance=self._pixel_spacing_diff_tolerance,
)
else:
pixel_spacing = pixel_spacing_module.UndefinedPixelSpacing(
spacing_diff_tolerance=self._pixel_spacing_diff_tolerance,
)
return Level(
level_index=dicom_object.get_value(tags.SOP_INSTANCE_UID),
width=dicom_object.get_value(tags.COLUMNS),
height=dicom_object.get_value(tags.ROWS),
samples_per_pixel=dicom_object.get_value(tags.SAMPLES_PER_PIXEL),
bits_allocated=dicom_object.get_value(tags.BITS_ALLOCATED),
high_bit=dicom_object.get_value(tags.HIGH_BIT),
pixel_spacing=pixel_spacing,
frame_width=dicom_object.get_value(tags.COLUMNS),
frame_height=dicom_object.get_value(tags.ROWS),
frame_number_min=1,
frame_number_max=1,
instances={
0: Instance(
frame_offset=0,
frame_count=1,
dicom_object=dicom_object,
)
},
transfer_syntax_uid=dicom_object.get_value(tags.TRANSFER_SYNTAX_UID),
)
def _add_untiled_image_dicom_instance(
self,
untiled_imaging: MutableMapping[str, Level],
dicom_object: dicom_web_interface.DicomObject,
) -> None:
_validate_dicom_instance_defines_ez_wsi_required_tags_for_untiled_imaging(
dicom_object
)
untiled_image_level = self._create_untiled_image_level(dicom_object)
untiled_imaging[untiled_image_level.level_index] = untiled_image_level
def _build_untiled_image_map(
self,
dicom_objects: Collection[dicom_web_interface.DicomObject],
) -> Mapping[str, Level]:
"""Returns a slide level map built from a set of input DICOM instances.
The algorithm groups instances in multiple sets by level. The result is
stored
in a ordered dictionary, using the level index as the key
Within each level, all instances belonging to that level are stored into a
dictionary, using the frame offset of the instance as the key:
Dict[frame_offset, Instance]
Both the level dictionary and the instance dictionary need to be accessed in
order by the key values. To avoid sorting the keys on-the-fly, we sort
both dictionaries beforehand.
Args:
dicom_objects: DICOM objects to use to build the SlideLevelMap.
Returns:
A sorted dict that contains all pyramid levels of the DICOM object.
Raises:
UnexpectedDicomObjectInstanceError: If any of the input objects is not a
DICOM instance object.
NoDicomLevelsDetectedError: No level was detected in the Dicom object.
DicomTagNotFoundError: If any of the required tags is missing from any
DICOM
object.
InvalidDicomTagError: If check_no_zero is enabled and the tag has zero
value.
SlideLevelContainsInstancesWithDifferentTransferSyntaxUIDError: Slide
level
contains instances with different transfer syntaxes.
"""
images = {}
for dicom_object in dicom_objects:
if dicom_object.type() != dicom_path.Type.INSTANCE:
raise ez_wsi_errors.UnexpectedDicomObjectInstanceError(
'SlideLevelMap expects all input DicomObject to have a type of '
f'INSTANCE. Actual: {dicom_object.type()}'
)
_check_for_tag_or_raise(dicom_object, tags.SOP_CLASS_UID)
sop_class_uid = dicom_object.get_value(tags.SOP_CLASS_UID)
if sop_class_uid not in UNTILED_IMAGE_SOP_CLASS_UID:
continue
self._add_untiled_image_dicom_instance(images, dicom_object)
if not images:
raise ez_wsi_errors.NoDicomLevelsDetectedError(
'No non-tiled images detected from the input DICOM objects'
f' {dicom_objects}'
)
return _order_untiled_level_map(images)
def to_dict(
self,
level_subset: Optional[List[Level]] = None,
) -> MutableMapping[str, Any]:
"""Converts pyramid level map into Dictionary."""
level_map = {}
for key, slide_level in self._untiled_image_map.items():
if level_subset is None or slide_level in level_subset:
level_map[key] = slide_level.to_dict()
if level_subset is not None and len(level_subset) != len(level_map):
raise ez_wsi_errors.LevelNotFoundError('Levels not found in slide.')
return {
'untiled_image_map': level_map,
}
def to_json(self, level_subset: Optional[List[Level]] = None) -> str:
"""Converts pyramid level map into JSON."""
return json.dumps(self.to_dict(level_subset))
def get_level_icc_profile_bytes(
self,
level: Level,
dwi: dicom_web_interface.DicomWebInterface,
) -> bytes:
"""Returns ICC profile for bytes for a pyramid level."""
with self._slide_metadata_lock:
instance_path = level.instances[0].dicom_object.path
key = instance_path.complete_url
icc_profile = self._icc_profile_level_bytes_cache.get(key)
if icc_profile is not None:
return icc_profile
icc_profile = _get_level_icc_profile_bytes(instance_path, dwi)
self._icc_profile_level_bytes_cache[key] = icc_profile
return icc_profile
@property
def untiled_image_map(self) -> Mapping[str, Level]:
return self._untiled_image_map