# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""An interface for accessing a Pathology slide stored in a DICOMStore.

Whole-Slide Images (WSI) (https://dicom.nema.org/dicom/dicomwsi) in digital
pathology are scans of glass slides typically created at a magnification of
20x or 40x using microscopes. Pixels typically represent between
0.275 µm - 0.5 µm of a slide but the exact size depends on the scanner camera
resolution. With typical dimensions between of 15mm x 15mm for tissue slides,
WSIs are often about 50,000 x 50,000 pixels for a 20x magnification or
100,000 x 100,000 pixels for a 40x magnification. The images are usually stored
as image pyramid to allow for fast access to different magnification levels.

The class layout/hierarchy is as follows:
DicomSlide: a single (whole) slide image typically represented as
      an image pyramid along with slide level metadata.
   |--> DicomImage: an image at a specific magnification in the image pyramid of
        a slide along with associated metadata.
     |--> DicomImagePatch: a rectangular patch/tile of an Image
"""

from __future__ import annotations

import abc
from collections.abc import Sequence
import copy
import dataclasses
import heapq
import importlib.resources
import io
import itertools
import json
import logging
import math
from typing import Any, Collection, Dict, Iterator, List, Mapping, Optional, Set, Tuple, Union

import cv2
from ez_wsi_dicomweb import credential_factory
from ez_wsi_dicomweb import dicom_frame_decoder
from ez_wsi_dicomweb import dicom_web_interface
from ez_wsi_dicomweb import ez_wsi_errors
from ez_wsi_dicomweb import ez_wsi_logging_factory
from ez_wsi_dicomweb import local_dicom_slide_cache
from ez_wsi_dicomweb import local_dicom_slide_cache_types
from ez_wsi_dicomweb import pixel_spacing as pixel_spacing_module
from ez_wsi_dicomweb import slide_level_map
from ez_wsi_dicomweb.ml_toolkit import dicom_path
from ez_wsi_dicomweb.ml_toolkit import tags
import google.auth
import numpy as np
import PIL
from PIL import ImageCms
import pydicom


# JSON slide metadata keys
_SOP_CLASS_UID = 'sop_class_uid'
LEVEL_MAP = 'level_map'
_SLIDE_PATH = 'slide_path'
UNTILED_MICROSCOPE_IMAGE_MAP = 'untiled_microscope_image_map'

# ICC Profile Color Correction
_RAW = 'raw'
_RGB = 'RGB'
_SRGB = 'sRGB'

# Annotation IOD
# https://dicom.nema.org/medical/dicom/current/output/chtml/part04/sect_b.5.html
MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_STORAGE = '1.2.840.10008.5.1.4.1.1.91.1'

ImageDimensions = slide_level_map.ImageDimensions
Level = slide_level_map.Level
ResizedLevel = slide_level_map.ResizedLevel


def _read_icc_profile(dir_name: str, filename: str) -> bytes:
  # https://setuptools.pypa.io/en/latest/userguide/datafiles.html
  rc_file = importlib.resources.files('third_party')
  return rc_file.joinpath(dir_name, filename).read_bytes()


def get_srgb_icc_profile_bytes() -> bytes:
  """Returns sRGB ICC Profile bytes."""
  try:
    return _read_icc_profile('srgb', 'sRGB_v4_ICC_preference.icc')
  except FileNotFoundError:
    return ImageCms.ImageCmsProfile(ImageCms.createProfile(_SRGB)).tobytes()


def get_adobergb_icc_profile_bytes() -> bytes:
  """Returns AdobeRGB ICC Profile bytes."""
  return _read_icc_profile('adobergb1998', 'AdobeRGB1998.icc')


def get_rommrgb_icc_profile_bytes() -> bytes:
  """Returns ROMM RGB ICC Profile bytes."""
  return _read_icc_profile('rommrgb', 'ISO22028-2_ROMM-RGB.icc')


def _get_cmsprofile_from_iccprofile_bytes(b: bytes) -> ImageCms.ImageCmsProfile:
  """Converts ICC Profile bytes to ImageCms.ImageCmsProfile."""
  return ImageCms.getOpenProfile(io.BytesIO(b))


def get_srgb_icc_profile() -> ImageCms.core.CmsProfile:
  """Returns sRGB ICC Profile."""
  return _get_cmsprofile_from_iccprofile_bytes(get_srgb_icc_profile_bytes())


def get_adobergb_icc_profile() -> ImageCms.core.CmsProfile:
  """Returns AdobeRGB ICC Profile."""
  return _get_cmsprofile_from_iccprofile_bytes(get_adobergb_icc_profile_bytes())


def get_rommrgb_icc_profile() -> ImageCms.core.CmsProfile:
  """Returns ROMMRGB ICC Profile."""
  return _get_cmsprofile_from_iccprofile_bytes(get_rommrgb_icc_profile_bytes())


@dataclasses.dataclass(frozen=True)
class _PatchIntersection:
  x_origin: int  # The upper leftmost x coordinate of the patch intersection.
  y_origin: int  # The upper leftmost y coordinate of the patch intersection.
  width: int
  height: int


@dataclasses.dataclass(frozen=True)
class Frame:
  """A frame in a DICOM, in pixel unit."""

  x_origin: int  # The upper leftmost x coordinate of the patch intersection.
  y_origin: int  # The upper leftmost y coordinate of the patch intersection.
  width: int
  height: int
  image_np: np.ndarray


# DICOM Transfer Syntax's which do not require transcoding and are natively
# compatible.
_SUPPORTED_CLIENT_SIDE_DECODING_RAW_TRANSFER_SYNTAXS = (
    '1.2.840.10008.1.2.1',
    '1.2.840.10008.1.2',
    '1.2.840.10008.1.2.1.99',  # Deflated Explicit VR Little Endian
)


def is_client_side_pixel_decoding_supported(transfer_syntax: str) -> bool:
  """Returns True if cache supports client side decoding of pixel encoding.

  Args:
    transfer_syntax: DICOM transfer syntax uid.

  Returns:
    True if cache supports operation on instances with encoding.
  """
  return (
      dicom_frame_decoder.can_decompress_dicom_transfer_syntax(transfer_syntax)
      or transfer_syntax in _SUPPORTED_CLIENT_SIDE_DECODING_RAW_TRANSFER_SYNTAXS
  )


def _pixel_spacing_to_level(
    slide: DicomSlide,
    pixel_spacing: pixel_spacing_module.PixelSpacing,
    maximum_downsample: float = 0.0,
) -> slide_level_map.Level:
  """Returns the level that has the lowest level index."""
  logging.info(
      'Pixel spacing parameter will be deprecated in future versions. Modify'
      ' code to define source imaging using slide_level_map.Level.'
  )
  source_image_level = slide.get_level_by_pixel_spacing(
      pixel_spacing, maximum_downsample=maximum_downsample
  )
  if source_image_level is None:
    raise ez_wsi_errors.PixelSpacingLevelNotFoundError(
        'No pyramid level found with pixel spacing:'
        f' ~{pixel_spacing.pixel_spacing_mm}  mm/px'
    )
  return source_image_level


class DicomImage:
  """Represents an image at a specific pixel spacing in a DicomSlide."""

  def __init__(
      self,
      level: Union[
          slide_level_map.Level,
          slide_level_map.ResizedLevel,
      ],
      source: _DicomSeries,
  ):
    """Constructor for DicomImage.

    Args:
      level: Pyramid level to return.
      source: The series this image belongs to.
    """
    self._output_image_level = level
    self._source = source

  @property
  def pixel_spacing(self) -> pixel_spacing_module.PixelSpacing:
    return self._output_image_level.pixel_spacing

  @property
  def source(self) -> _DicomSeries:
    return self._source

  def get_image_as_patch(
      self, require_fully_in_source_image: bool = False
  ) -> DicomPatch:
    return self._source.get_patch(
        self._output_image_level,
        x=0,
        y=0,
        width=self._output_image_level.width,
        height=self._output_image_level.height,
        require_fully_in_source_image=require_fully_in_source_image,
    )

  @property
  def width(self) -> int:
    return self._output_image_level.width

  @property
  def height(self) -> int:
    return self._output_image_level.height

  def image_bytes(
      self, color_transform: Optional[ImageCms.ImageCmsTransform] = None
  ) -> np.ndarray:
    """Loads the pixel bytes of the DICOM Image.

    Args:
      color_transform: Optional ICC Profile color transformation to perform on
        image.

    Returns:
      Numpy array representing the DICOM Image.
    """
    # Internally reuses the Patch implementation for bytes fetching.
    # An image can be represented as a giant patch starting from (0, 0)
    # and spans the whole slide.
    return self.get_image_as_patch().image_bytes(
        color_transform=color_transform
    )


@dataclasses.dataclass(frozen=True)
class PatchBounds:
  """A bounding rectangle of a patch, in pixel units."""

  x_origin: int  # The upper leftmost x coordinate of the patch intersection.
  y_origin: int  # The upper leftmost y coordinate of the patch intersection.
  width: int
  height: int


class BasePatch:
  """A rectangular patch/tile/view of an Image at a specific pixel spacing."""

  def __init__(
      self,
      x: int,
      y: int,
      width: int,
      height: int,
  ):
    self._x = x
    self._y = y
    self._width = width
    self._height = height

  @property
  def x(self) -> int:
    return self._x

  @property
  def y(self) -> int:
    return self._y

  @property
  def width(self) -> int:
    return self._width

  @property
  def height(self) -> int:
    return self._height

  @property
  def patch_bounds(self) -> PatchBounds:
    return PatchBounds(
        x_origin=self.x, y_origin=self.y, width=self.width, height=self.height
    )

  def is_patch_fully_in_source_image_dim(self, width: int, height: int) -> bool:
    if self.x < 0 or self.y < 0 or self.width <= 0 or self.height <= 0:
      return False
    return self.x + self.width <= width and self.y + self.height <= height


def get_image_bytes_samples_per_pixel(image_bytes: np.ndarray) -> int:
  """Returns the number of samples per pixel in the image.

  Args:
    image_bytes: Uncompressed image bytes (e.g., 8 bit RGB)

  Raises:
    ez_wsi_errors.GcsImageError: If the image is not 2D or 3D.
  """
  if len(image_bytes.shape) == 2:
    return 1
  elif len(image_bytes.shape) == 3:
    return image_bytes.shape[2]
  raise ez_wsi_errors.GcsImageError(
      f'Invalid image shape: {image_bytes.shape}. Image must be 2D or 3D.'
  )


def transform_image_bytes_color(
    image_bytes: np.ndarray,
    color_transform: Optional[ImageCms.ImageCmsTransform] = None,
) -> np.ndarray:
  """Transforms image bytes color using ICC Profile Transformation."""
  samples_per_pixel = get_image_bytes_samples_per_pixel(image_bytes)
  height, width = image_bytes.shape[0:2]
  if color_transform is None or samples_per_pixel <= 1:
    return image_bytes
  img = PIL.Image.frombuffer(
      _RGB,
      (width, height),
      image_bytes.tobytes(),
      decoder_name=_RAW,
  )
  ImageCms.applyTransform(img, color_transform, inPlace=True)
  return np.asarray(img)


class _SlidePyramidLevelPatch(BasePatch):
  """A rectangular patch/tile/view of an Image at a specific pixel spacing.

  A Patch's data is composed from its overlap with one or more DICOM Frames.
  Patch representation here represents a region of pixels as stored in DICOM.
  """

  def __init__(
      self,
      source: _DicomSeries,
      level: slide_level_map.Level,
      x: int,
      y: int,
      width: int,
      height: int,
  ):
    """Constructor.

    Args:
      source: The source DICOM series this patch belongs to.
      level: Pyramid Level source imaging is derived from.
      x: The X coordinate of the starting point (upper-left corner) of the
        patch.
      y: The Y coordinate of the starting point (upper-left corner) of the
        patch.
      width: The width of the patch.
      height: The height of the patch.
    """
    super().__init__(x, y, width, height)
    self._level = level
    self._source = source
    if not source.has_level(level):
      raise ez_wsi_errors.LevelNotFoundError(
          f'The level {level.level_index} is not found in the slide.'
      )

  def is_patch_fully_in_source_image(self) -> bool:
    return self.is_patch_fully_in_source_image_dim(
        self._level.width, self._level.height
    )

  def get_pyramid_imaging_source_level(self) -> slide_level_map.Level:
    return self._level

  @property
  def pixel_spacing(self) -> pixel_spacing_module.PixelSpacing:
    return self._level.pixel_spacing

  def __eq__(self, other: Any) -> bool:
    if not isinstance(other, _SlidePyramidLevelPatch):
      return False
    return (
        self.x == other.x
        and self.y == other.y
        and self.width == other.width
        and self.height == other.height
        and (self._source is other._source or self._source == other._source)
        and (self._level is other._level or self._level == other._level)
    )

  @property
  def source(self) -> _DicomSeries:
    return self._source

  def _get_intersection(
      self,
      frame_x_cord: int,
      frame_y_cord: int,
      frame_width: int,
      frame_height: int,
  ) -> _PatchIntersection:
    """Returns intersection from the source patch and rectangular coordinate.

    Args:
      frame_x_cord: Source frame x coordinate (upper left).
      frame_y_cord: Source frame y coordinate (upper left).
      frame_width: Source frame width
      frame_height: Source frame height

    Returns:
      Intersection between patches and rectangular coordinate.

    Raises:
      PatchIntersectionNotFoundError if there is no overlap between the source
      and the destination patches.
    """
    x = max(frame_x_cord, self.x)
    y = max(frame_y_cord, self.y)
    width = min(frame_x_cord + frame_width - x, self.x + self.width - x)
    height = min(frame_y_cord + frame_height - y, self.y + self.height - y)
    if width <= 0 or height <= 0:
      raise ez_wsi_errors.PatchIntersectionNotFoundError(
          'There is no overlap region between the source and the destination'
          ' patches.'
      )
    return _PatchIntersection(
        x_origin=x, y_origin=y, width=width, height=height
    )

  def _copy_overlapped_region(
      self, src_frame: Frame, dst_image_np: np.ndarray
  ) -> Tuple[int, int]:
    """Copies the overlapped region from the source patch to the dest nd.array.

    Args:
      src_frame: the source frame to be copied from.
      dst_image_np: the destination np to be copied to.

    Returns:
      The width and height of the overlapped region gets copied.

    Raises:
      PatchIntersectionNotFoundError if there is no overlap between the source
      and the destination patches.
    """
    intersection = self._get_intersection(
        src_frame.x_origin,
        src_frame.y_origin,
        src_frame.width,
        src_frame.height,
    )
    x = intersection.x_origin
    y = intersection.y_origin
    width = intersection.width
    height = intersection.height
    # pylint: disable=protected-access
    _copy_ndarray(
        src_frame.image_np,
        x - src_frame.x_origin,
        y - src_frame.y_origin,
        width,
        height,
        dst_image_np,
        x - self.x,
        y - self.y,
    )
    # pylint: enable=protected-access
    return width, height

  def frame_numbers(self) -> Iterator[int]:
    """Generates slide level frame numbers required to render patch.

    Frame numbering starts at 1.

    Yields:
      A generator that produces frame numbers.

    Raises:
      DicomSlideMissingError if slide used to create self is None.
    """
    if self.source is None:
      raise ez_wsi_errors.DicomSlideMissingError(
          'Unable to get image pixels. Parent slide is None.'
      )
    y = self.y
    x = self.x
    width = self.width
    height = self.height
    cy = y
    frame_width = self._level.frame_width
    frame_height = self._level.frame_height
    cached_instance = None
    last_instance_frame_number = -math.inf
    while cy < y + height and cy < self._level.height:
      cx = x
      region_height = 0
      while cx < x + width and cx < self._level.width:
        try:
          frame_number = self._level.get_frame_number_by_point(cx, cy)
          if (
              cached_instance is None
              or frame_number - cached_instance.frame_offset
              >= cached_instance.frame_count
          ):
            cached_instance = self._level.get_instance_by_frame(frame_number)
            last_instance_frame_number = -math.inf
            if cached_instance is None:
              raise ez_wsi_errors.FrameNumberOutofBoundsError()
          if frame_number > last_instance_frame_number:
            yield frame_number
          last_instance_frame_number = frame_number
          pos_x, pos_y = self._level.get_frame_position(frame_number)
          intersection = self._get_intersection(
              pos_x, pos_y, frame_width, frame_height
          )
          region_width = intersection.width
          region_height = intersection.height
        except ez_wsi_errors.EZWsiError:
          # No frame found at (cx, cy), move 1 pixel in both X, and Y direction.
          region_width = 1
          region_height = max(1, region_height)
        cx += region_width
      cy += region_height

  def image_bytes(
      self, color_transform: Optional[ImageCms.ImageCmsTransform] = None
  ) -> np.ndarray:
    """Returns the patch's image bytes.

    Args:
      color_transform: Optional ICC Profile color transformation to apply to on
        image bytes.

    Returns:
      Numpy array type image.

    Raises:
      DicomSlideMissingError if slide used to create self is None.
      PatchOutOfBoundsError if the patch is not within the bounds of the DICOM
      image.
    """
    if self._source is None:
      raise ez_wsi_errors.DicomSlideMissingError(
          'Unable to get image pixels. Parent slide is None.'
      )
    image_bytes = np.zeros(
        (self.height, self.width, self._level.samples_per_pixel),
        self._level.pixel_format,
    )
    pixel_copied = False
    # Copies image pixels from all overlapped frames.
    for frame_number in self.frame_numbers():
      frame = self.source.get_frame(self._level, frame_number)
      if frame is None:
        continue
      try:
        self._copy_overlapped_region(frame, image_bytes)
        pixel_copied = True
      except ez_wsi_errors.EZWsiError:
        continue
    if not pixel_copied:
      raise ez_wsi_errors.SectionOutOfImageBoundsError(
          'The requested patch is out of scope of the image.'
      )
    return transform_image_bytes_color(image_bytes, color_transform)

  def get_gcp_data_credential_header(
      self, credential: Optional[google.auth.credentials.Credentials] = None
  ) -> Dict[str, str]:
    """Returns the credential header patch requests."""
    return self.source.get_credential_header(credential)


def _scale_coordinate(pt: int, source_width: int, dest_width: int) -> int:
  """Scale coordinate from source to destination."""
  return int(int(pt) * int(dest_width) / int(source_width))


def _bottom_offset_padding(pt: int, source_width: int, dest_width: int) -> int:
  """Returns starting offset when upsampling.

  Returns 0 if downsampling. Otherwise determines the fraction of the first
  upsampled pixel which falls within the upsampled image.

  Args:
    pt: Coordinate of pixel in source imaging.
    source_width: Width of source imaging.
    dest_width: Width of destination imaging.

  Returns:
    0 if imaging is being downsampled or clipping pixel offset for first
    upsampled pixel.
  """
  if source_width >= dest_width:
    return 0
  sf = int(source_width) / int(dest_width)
  fractional_pt = float(int(pt) * int(source_width) / int(dest_width))
  padding = int((fractional_pt - math.floor(fractional_pt)) / sf)
  return max(padding, 0)


def _top_offset_padding(pt: int, source_width: int, dest_width: int) -> bool:
  """Returns true if last upsampled pixel partially falls on image.

  Returns false if downsampling. Otherwise determines i the last
  upsampled pixel which falls only partially within the upsampled image bounds.

  Args:
    pt: Coordinate of pixel in source imaging.
    source_width: Width of source imaging.
    dest_width: Width of destination imaging.

  Returns:
    False if imaging is being downsampled or True if last pixel only partially
    falls within image patch.
  """
  if source_width >= dest_width:
    return False
  sf = int(source_width) / int(dest_width)
  fractional_pt = float(int(pt) * int(source_width) / int(dest_width))
  padding = int((math.ceil(fractional_pt) - fractional_pt) / sf)
  return max(padding, 0) > 0


class DicomPatch(_SlidePyramidLevelPatch):
  """A rectangular patch/tile/view of an Image at a specific pixel spacing.

  Abstraction over, _SlidePyramidLevelPatch that supports resizing source
  pyramid imaging to target pixel spacing.
  """

  def __init__(
      self,
      source_image_level: slide_level_map.Level,
      x: int,
      y: int,
      width: int,
      height: int,
      source: _DicomSeries,
      destination_image_level: Union[
          slide_level_map.Level, slide_level_map.ResizedLevel, None
      ] = None,
      require_fully_in_source_image: bool = False,
  ):
    """Constructor.

    Args:
      source_image_level: Level that slide source imaging is generated from.
      x: The X coordinate of the starting point (upper-left corner) of the
        generated patch in destination image level coordinates.
      y: The Y coordinate of the starting point (upper-left corner) of the
        generated patch in destination image level coordinates.
      width: The width of the generated patch in destination image.
      height: The height of the generated patch in destination image.
      source: The parent DICOM Slide this patch belongs to.
      destination_image_level: Level that patch represents if undefined defaults
        to source level.
      require_fully_in_source_image: Require patch be fully in image
    """
    if (
        not source_image_level.tiled_full
        and source_image_level.number_of_frames != 1
    ):
      raise ez_wsi_errors.DicomPatchGenerationError(
          'DICOM instance(s) do not have TILED_FULL Dimension Organization'
          ' Type.'
      )
    super().__init__(source, source_image_level, x, y, width, height)
    if destination_image_level is None:
      destination_image_level = source_image_level
    self._destination_image_level = destination_image_level
    source_start_x = _scale_coordinate(
        x, destination_image_level.width, source_image_level.width
    )
    source_start_y = _scale_coordinate(
        y, destination_image_level.height, source_image_level.height
    )
    source_end_x = _scale_coordinate(
        x + width, destination_image_level.width, source_image_level.width
    )
    source_end_y = _scale_coordinate(
        y + height, destination_image_level.height, source_image_level.height
    )
    if (
        source_start_x == x
        and source_start_y == y
        and source_end_x == x + width
        and source_end_y == y + height
    ):
      # patch imaging is not being resized.
      self._resized_patch_source = None
      self._pixel_spacing = source_image_level.pixel_spacing
      self._bottom_x_offset_pad = 0  # Not used in if dimensions unchanged.
      self._bottom_y_offset_pad = 0  # Not used
      self._upsample_target_width = width  # Not used
      self._upsample_target_height = height  # Not used
    else:
      # patch imaging is not being resized.

      # determine offsets for image upsampling.  NOPs for downsampling.
      self._bottom_x_offset_pad = _bottom_offset_padding(
          x, source_image_level.width, destination_image_level.width
      )
      self._bottom_y_offset_pad = _bottom_offset_padding(
          y, source_image_level.height, destination_image_level.height
      )
      # if patch bounds end before end of image and there is an additional
      # pixel in source which fractionally falls on the patch, then include.
      if source_end_x < source_image_level.width and _top_offset_padding(
          x + width, source_image_level.width, destination_image_level.width
      ):
        source_end_x += 1
      # if patch bounds end before end of image and there is an additional
      # pixel in source which fractionally falls on the patch, then include.
      if source_end_y < source_image_level.height and _top_offset_padding(
          y + height,
          source_image_level.height,
          destination_image_level.height,
      ):
        source_end_y += 1

      self._pixel_spacing = destination_image_level.pixel_spacing
      source_width = source_end_x - source_start_x
      source_height = source_end_y - source_start_y
      self._resized_patch_source = _SlidePyramidLevelPatch(
          source,
          source_image_level,
          source_start_x,
          source_start_y,
          source_width,
          source_height,
      )
      # determine dimensions to rescale upsampled image to before clipping
      self._upsample_target_width = _scale_coordinate(
          source_start_x + source_width,
          source_image_level.width,
          destination_image_level.width,
      ) - _scale_coordinate(
          source_start_x,
          source_image_level.width,
          destination_image_level.width,
      )
      self._upsample_target_height = _scale_coordinate(
          source_start_y + source_height,
          source_image_level.height,
          destination_image_level.height,
      ) - _scale_coordinate(
          source_start_y,
          source_image_level.height,
          destination_image_level.height,
      )
    if (
        require_fully_in_source_image
        and not self.is_patch_fully_in_source_image()
    ):
      raise ez_wsi_errors.PatchOutsideOfImageDimensionsError(
          'A portion of the patch does not overlap the image.'
      )
    self._require_fully_in_source_image = require_fully_in_source_image

  def __eq__(self, other: Any) -> bool:
    if not isinstance(other, DicomPatch):
      return False
    return (
        self.x == other.x
        and self.y == other.y
        and self.width == other.width
        and self.height == other.height
        and (self.source is other.source or self.source == other.source)
        and (self._level is other._level or self._level == other._level)
        and (
            self._destination_image_level is other._destination_image_level
            or self._destination_image_level == other._destination_image_level
        )
    )

  @property
  def id(self) -> str:
    if not self.source or not self.source.accession_number:
      return (
          f'M_{self.pixel_spacing.as_magnification_string}:'
          f'{self.width:06d}x{self.height:06d}{self.x:+07d}{self.y:+07d}'
      )
    # Be consistent with internal id format.
    # "%s:%06dx%06d%+07d%+07d", image_id, width, height, left, top
    return (
        f'{self.source.accession_number}:M_{self.pixel_spacing.as_magnification_string}:'
        f'{self.width:06d}x{self.height:06d}{self.x:+07d}{self.y:+07d}'
    )

  def is_patch_fully_in_source_image(self) -> bool:
    if self._resized_patch_source is None:
      return super().is_patch_fully_in_source_image()
    return self._resized_patch_source.is_patch_fully_in_source_image()

  @property
  def pixel_spacing(self) -> pixel_spacing_module.PixelSpacing:
    return self._pixel_spacing

  def frame_numbers(self) -> Iterator[int]:
    if self._resized_patch_source is None:
      return super().frame_numbers()
    # returns frame numbers in source image level.
    return self._resized_patch_source.frame_numbers()

  @property
  def is_resized(self) -> bool:
    return self._resized_patch_source is not None

  @property
  def level(
      self,
  ) -> Union[slide_level_map.Level, slide_level_map.ResizedLevel]:
    """Level patch bytes are generated with respect to."""
    return self._destination_image_level

  def image_bytes(
      self, color_transform: Optional[ImageCms.ImageCmsTransform] = None
  ) -> np.ndarray:
    if self._resized_patch_source is None:
      # no change to byte dimensions.
      return super().image_bytes(color_transform=color_transform)
    source_image_bytes = self._resized_patch_source.image_bytes(
        color_transform=color_transform
    )
    if (
        self.width > self._resized_patch_source.width
        or self.height > self._resized_patch_source.height
    ):
      # upsample bytes
      pixels = cv2.resize(
          source_image_bytes,
          (self._upsample_target_width, self._upsample_target_height),
          interpolation=cv2.INTER_CUBIC,
      )
      # clip regions of upsampled imaging not falling in patch.
      return pixels[
          self._bottom_y_offset_pad : (self.height + self._bottom_y_offset_pad),
          self._bottom_x_offset_pad : (self.width + self._bottom_x_offset_pad),
          ...,
      ]
    else:
      # Downsample bytes
      return cv2.resize(
          source_image_bytes,
          (self.width, self.height),
          interpolation=cv2.INTER_AREA,
      )

  def get_patch(
      self,
      x: int,
      y: int,
      width: int,
      height: int,
      require_fully_in_source_image: Optional[bool] = None,
  ) -> DicomPatch:
    """Returns a patch based from the existing patch."""
    require_fully_in_source_image = (
        self._require_fully_in_source_image
        if require_fully_in_source_image is None
        else require_fully_in_source_image
    )
    return DicomPatch(
        self._level,
        x,
        y,
        width,
        height,
        self.source,
        self._destination_image_level,
        require_fully_in_source_image=require_fully_in_source_image,
    )


def _get_native_level(
    slm: slide_level_map.SlideLevelMap,
) -> Optional[slide_level_map.Level]:
  """Gets the native level from a SlideLevelMap.

  The native level of a slide has the lowest level index.

  Args:
    slm: The source SlideLevelMap to get the native level from.

  Returns:
    The level that has the lowest index in the slide.

  Raises:
    LevelNotFoundError if the native level does not exist.
  """
  if slm.level_index_min is not None:
    level = slm.get_level(slm.level_index_min)
    if level is not None:
      return level
  return None


def _copy_ndarray(
    src_array: np.ndarray,
    src_x: int,
    src_y: int,
    width: int,
    height: int,
    dst_array: np.ndarray,
    dst_x: int,
    dst_y: int,
) -> None:
  """Copies a sub-array of an ndarray to another ndarray.

  Args:
    src_array: The source ndarray to copy the sub-array from.
    src_x: The X coordinate of the starting point to copy from, in the source
      array.
    src_y: The Y coordinate of the starting point to copy from, in the source
      array.
    width: The width of the sub-array to copy.
    height: The height of the sub-array to copy.
    dst_array: The destination ndarray to copy the sub-array into.
    dst_x: The X coordinate of the starting point to copy to, in the destination
      array.
    dst_y: The Y coordinate of the starting point to copy, in the destination
      array.

  Raises:
    SectionOutOfImageBoundsError if the sub-array to copy is out of scope of the
    source array or the destination array.
  """
  src_height, src_width = src_array.shape[:2]
  dst_height, dst_width = dst_array.shape[:2]
  if (
      src_x < 0
      or src_y < 0
      or src_x + width > src_width
      or src_y + height > src_height
  ):
    raise ez_wsi_errors.SectionOutOfImageBoundsError(
        'The sub-array to copy is out of the scope of the source array.'
    )
  if (
      dst_x < 0
      or dst_y < 0
      or dst_x + width > dst_width
      or dst_y + height > dst_height
  ):
    raise ez_wsi_errors.SectionOutOfImageBoundsError(
        'The sub-array to copy is out of the scope of the destination array.'
    )
  dst_array[dst_y : (dst_y + height), dst_x : (dst_x + width)] = src_array[
      src_y : (src_y + height), src_x : (src_x + width)
  ]


def create_icc_profile_transformation(
    source_image_icc_profile_bytes: bytes,
    dest_icc_profile: Union[bytes, ImageCms.core.CmsProfile],
    rendering_intent: ImageCms.Intent = ImageCms.Intent.PERCEPTUAL,
) -> Optional[ImageCms.ImageCmsTransform]:
  """Returns transformation to from pyramid colorspace to icc_profile.

  Args:
    source_image_icc_profile_bytes: Source image icc profile bytes.
    dest_icc_profile: ICC Profile to DICOM Pyramid imaging to.
    rendering_intent: Rendering intent to use in transformation.

  Returns:
    PIL.ImageCmsTransformation to transform pixel imaging or None.
  """
  if not source_image_icc_profile_bytes or not dest_icc_profile:
    return None
  dicom_input_profile = ImageCms.getOpenProfile(
      io.BytesIO(source_image_icc_profile_bytes)
  )
  if isinstance(dest_icc_profile, bytes):
    dest_icc_profile = ImageCms.getOpenProfile(io.BytesIO(dest_icc_profile))
  return ImageCms.buildTransform(
      dicom_input_profile,
      dest_icc_profile,
      _RGB,
      _RGB,
      renderingIntent=rendering_intent,
  )


def _get_level_series_path(
    level: Union[slide_level_map.Level, slide_level_map.ResizedLevel],
) -> dicom_path.Path:
  """Returns level series path."""
  if isinstance(level, slide_level_map.ResizedLevel):
    level = level.source_level
  return next(iter(level.instances.values())).path.GetSeriesPath()


class _DicomSeries(metaclass=abc.ABCMeta):
  """DICOM images."""

  def __init__(
      self,
      dwi: dicom_web_interface.DicomWebInterface,
      path: dicom_path.Path,
      enable_client_slide_frame_decompression: bool = True,
      accession_number: Optional[str] = None,
      logging_factory: Optional[
          ez_wsi_logging_factory.AbstractLoggingInterfaceFactory
      ] = None,
      slide_frame_cache: Optional[
          local_dicom_slide_cache.InMemoryDicomSlideCache
      ] = None,
  ):
    """Constructor.

    Args:
      dwi: The DicomWebInterface that has been configured to be able to access
        the series referenced by the input path.
      path: The path to a DICOM series object in a DICOMStore.
      enable_client_slide_frame_decompression: Set to True to enable client side
        frame decompression optimization (Recommended value = True); remove
        parameter following GG validation.
      accession_number: The accession_number of the slide.
      logging_factory: The factory that EZ WSI uses to construct a logging
        interface.
      slide_frame_cache: Initialize to use shared slide cache.
    """
    self._logger = None
    if logging_factory is None:
      self._logging_factory = ez_wsi_logging_factory.BasePythonLoggerFactory(
          ez_wsi_logging_factory.DEFAULT_EZ_WSI_PYTHON_LOGGER_NAME
      )
    else:
      self._logging_factory = logging_factory
    self._dwi = dwi
    self.path = path
    self.accession_number = accession_number
    self._slide_frame_cache = slide_frame_cache
    self._enable_client_slide_frame_decompression = (
        enable_client_slide_frame_decompression
    )

  @abc.abstractmethod
  def json_metadata_dict(
      self,
      level_subset: Optional[List[slide_level_map.Level]] = None,
      max_json_encoded_icc_profile_size: int = slide_level_map.DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES,
  ) -> Mapping[str, Any]:
    """Returns JSON dict metadata for the series."""

  def json_metadata(
      self,
      level_subset: Optional[List[slide_level_map.Level]] = None,
      max_json_encoded_icc_profile_size: int = slide_level_map.DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES,
  ) -> str:
    """Returns JSON encoded metadata for the series."""
    return json.dumps(
        self.json_metadata_dict(
            level_subset=level_subset,
            max_json_encoded_icc_profile_size=max_json_encoded_icc_profile_size,
        )
    )

  def get_credentials(self) -> google.auth.credentials.Credentials:
    return self._dwi.credentials()

  def get_credential_header(
      self,
      credential: Optional[google.auth.credentials.Credentials] = None,
  ) -> Dict[str, str]:
    """Returns credential header for retrieval of DICOM store."""
    headers = {}
    if credential is None:
      credential = self.get_credentials()
    else:
      credential_factory.refresh_credentials(credential)
    credential.apply(headers)
    return headers

  def get_patch_bounds_dicom_instance_frame_numbers(
      self,
      image_level: Union[
          slide_level_map.Level,
          slide_level_map.ResizedLevel,
      ],
      patch_bounds_list: List[PatchBounds],
  ) -> Mapping[str, List[int]]:
    """Returns Map[DICOM instances: frame numbers] that fall in patch bounds.

    Args:
      image_level: Level that patch represents.
      patch_bounds_list: List of PatchBounds to return frame indexes for.

    Returns:
      Mapping between DICOM instances, path, and list of frames numbers required
      to render patches.
    """
    if isinstance(image_level, slide_level_map.ResizedLevel):
      source_image_level = image_level.source_level
    else:
      source_image_level = image_level
    slide_instance_frame_map = {}
    indexes_required_for_inference = []
    for patch_bounds in patch_bounds_list:
      patch = DicomPatch(
          source_image_level,
          patch_bounds.x_origin,
          patch_bounds.y_origin,
          patch_bounds.width,
          patch_bounds.height,
          self,
          image_level,
      )
      # Frame indexes returns a list of indexes in the patch. Indexes
      # are returned in sorted order.
      indexes_required_for_inference.append(patch.frame_numbers())
    instance_frame_number_buffer = []
    instance = None
    # Use Heapq to merge pre-sorted lists into single sorted list
    # Result of heapq.merge can have duplicates
    for frame_number in heapq.merge(*indexes_required_for_inference):
      if (
          instance is None
          or instance.instance_frame_number_from_wholes_slide_frame_number(  # pytype: disable=attribute-error
              frame_number
          )
          > instance.frame_count  # pytype: disable=attribute-error
      ):
        if instance_frame_number_buffer:
          slide_instance_frame_map[str(instance.dicom_object.path)] = (  # pytype: disable=attribute-error
              instance_frame_number_buffer
          )
        instance = source_image_level.get_instance_by_frame(frame_number)
        if instance is None:
          instance_frame_number_buffer = []
          continue
        instance_frame_number_buffer = [
            instance.instance_frame_number_from_wholes_slide_frame_number(
                frame_number
            )
        ]
        continue
      instance_frame_number = (
          instance.instance_frame_number_from_wholes_slide_frame_number(
              frame_number
          )
      )
      if (
          instance_frame_number_buffer
          and instance_frame_number_buffer[-1] == instance_frame_number
      ):
        # remove duplicates
        continue
      instance_frame_number_buffer.append(instance_frame_number)
    if instance_frame_number_buffer:
      slide_instance_frame_map[str(instance.dicom_object.path)] = (
          instance_frame_number_buffer
      )
    return slide_instance_frame_map

  def preload_level_in_frame_cache(
      self,
      level: Union[slide_level_map.Level, slide_level_map.ResizedLevel],
      blocking: bool = True,
  ):
    """Preloads entire level into frame cache.

    Args:
      level: Level or resized level to load into frame cache.
      blocking: If method should block until loading is complete.

    Returns:
      None

    Raises:
      ez_wsi_errors.LevelNotFoundError: Level or resized level not found on
        DICOM Slide.
    """
    slide_frame_cache = self.slide_frame_cache
    if slide_frame_cache is None:
      return
    if isinstance(level, slide_level_map.ResizedLevel):
      level = level.source_level
    # get path of instance on the level.
    if _get_level_series_path(level) != self.path.GetSeriesPath():
      raise ez_wsi_errors.LevelNotFoundError(
          'Level path does not match DICOM slide path.'
      )
    slide_frame_cache.cache_whole_instance_in_memory(level, blocking)

  def preload_patches_in_frame_cache(
      self,
      patch_seq: Union[Sequence[DicomPatch], DicomPatch],
      blocking: bool = True,
      copy_from_cache: Optional[
          local_dicom_slide_cache.InMemoryDicomSlideCache
      ] = None,
  ) -> None:
    """Pre-load sequence of DICOM Patches in frame cache.

    Args:
      patch_seq: patch or list of patches to load in frame cache.
      blocking: If method should block until loading is complete.
      copy_from_cache: Optional cache to copy frames from to avoid
        re-downloading.

    Returns:
      None

    Raises:
      ez_wsi_errors.LevelNotFoundError: Patch not found on DICOM Slide.
    """
    slide_frame_cache = self.slide_frame_cache
    if slide_frame_cache is None or not patch_seq:
      return
    if isinstance(patch_seq, DicomPatch):
      patch_seq = [patch_seq]
    level = None
    patches = []
    index = 0
    seq_len = len(patch_seq)
    slide_series_path = self.path.GetSeriesPath()
    while index < seq_len:
      patch = patch_seq[index]
      if level is None or level == patch.level:
        patches.append(patch)
        level = patch.level
        index += 1
        continue
      # get path of instance on the level.
      if _get_level_series_path(level) != slide_series_path:
        raise ez_wsi_errors.LevelNotFoundError(
            'Patch path does not match DICOM slide path.'
        )
      instance_frame_map = self.get_patch_bounds_dicom_instance_frame_numbers(
          level, [p.patch_bounds for p in patches]
      )
      slide_frame_cache.preload_instance_frame_numbers(
          instance_frame_map, copy_from_cache
      )
      level = None
      patches = []
    if patches:
      if _get_level_series_path(level) != slide_series_path:
        raise ez_wsi_errors.LevelNotFoundError(
            'Patch path does not match DICOM slide path.'
        )
      instance_frame_map = self.get_patch_bounds_dicom_instance_frame_numbers(
          level, [p.patch_bounds for p in patches]
      )
      slide_frame_cache.preload_instance_frame_numbers(
          instance_frame_map, copy_from_cache
      )
    if blocking:
      slide_frame_cache.block_until_frames_are_loaded()

  @property
  def logger(self) -> ez_wsi_logging_factory.AbstractLoggingInterface:
    if self._logger is None:
      self._logger = self._logging_factory.create_logger()
    return self._logger

  @property
  def slide_frame_cache(
      self,
  ) -> Optional[local_dicom_slide_cache.InMemoryDicomSlideCache]:
    """Returns DICOM slide frame cache used by slide."""
    return self._slide_frame_cache

  @slide_frame_cache.setter
  def slide_frame_cache(
      self,
      slide_frame_cache: Optional[
          local_dicom_slide_cache.InMemoryDicomSlideCache
      ],
  ) -> None:
    """Sets DICOM slide frame cache used by slide.

    Shared cache's configured using max_cache_frame_memory_lru_cache_size_bytes
    can be used to limit total cache memory utilization across multiple stores.
    It is not recommended to share non-LRU frame caches.

    Args:
      slide_frame_cache: Reference to slide frame cache.
    """
    self._slide_frame_cache = slide_frame_cache

  def init_slide_frame_cache(
      self,
      max_cache_frame_memory_lru_cache_size_bytes: Optional[int] = None,
      number_of_frames_to_read: int = local_dicom_slide_cache.DEFAULT_NUMBER_OF_FRAMES_TO_READ_ON_CACHE_MISS,
      max_instance_number_of_frames_to_prefer_whole_instance_download: int = local_dicom_slide_cache.MAX_INSTANCE_NUMBER_OF_FRAMES_TO_PREFER_WHOLE_INSTANCE_DOWNLOAD,
      optimization_hint: local_dicom_slide_cache_types.CacheConfigOptimizationHint = local_dicom_slide_cache_types.CacheConfigOptimizationHint.MINIMIZE_DICOM_STORE_QPM,
  ) -> local_dicom_slide_cache.InMemoryDicomSlideCache:
    """Initializes DICOM slide frame cache.

    Args:
      max_cache_frame_memory_lru_cache_size_bytes: Maximum size of cache in
        bytes.  Ideally should be in hundreds of megabyts-to-gigabyte size. If
        None, no limit to size.
      number_of_frames_to_read: Number of frames to read on cache miss.
      max_instance_number_of_frames_to_prefer_whole_instance_download: Max
        number of frames to prefer downloading whole instances over retrieving
        frames in batch (Typically faster for small instances e.g. < 10,0000).
        Optimal threshold will depend on average size of instance frame data and
        the size of non frame instance metadata.
      optimization_hint: Optimize cache to minimize data latency or total
        queries to the DICOM store.

    Returns:
      Instance of frame cache.
    """
    self._slide_frame_cache = local_dicom_slide_cache.InMemoryDicomSlideCache(
        credential_factory=self._dwi.credential_factory,
        max_cache_frame_memory_lru_cache_size_bytes=max_cache_frame_memory_lru_cache_size_bytes,
        number_of_frames_to_read=number_of_frames_to_read,
        max_instance_number_of_frames_to_prefer_whole_instance_download=max_instance_number_of_frames_to_prefer_whole_instance_download,
        optimization_hint=optimization_hint,
        logging_factory=self._logging_factory,
    )
    return self._slide_frame_cache

  def remove_slide_frame_cache(self) -> None:
    """Removes slide frame cache."""
    self._slide_frame_cache = None

  @property
  def dwi(self) -> dicom_web_interface.DicomWebInterface:
    return self._dwi

  def _get_cached_frame_bytes(
      self,
      instance: slide_level_map.Instance,
      frame_number: int,
  ) -> Optional[bytes]:
    """Returns frame bytes from frame cache if possible.

    Args:
      instance: Instance to return frame from.
      frame_number: Instance frame number to return.

    Returns:
      Frame bytes or None.
    """
    if (
        self._slide_frame_cache is None
        or not is_client_side_pixel_decoding_supported(
            instance.dicom_object.get_value(tags.TRANSFER_SYNTAX_UID)
        )
    ):
      return None
    return self._slide_frame_cache.get_frame(
        instance.dicom_object.path,
        instance.frame_count,
        frame_number,
    )

  def _get_frame_bytes_from_server(
      self,
      instance: slide_level_map.Instance,
      instance_frame_index: int,
      transcoding: dicom_web_interface.TranscodeDicomFrame,
  ) -> bytes:
    """Returns frame bytes from server.

    Args:
      instance: DICOM instance.
      instance_frame_index: Frame index to retrieve.
      transcoding: How to transcode DICOM frames.

    Returns:
      Frame bytes.
    """
    instance_path = instance.dicom_object.path
    cache_key = (
        f'i:{str(instance_path)} f:{instance_frame_index} t:{transcoding.value}'
    )
    if self._slide_frame_cache is not None:
      frame_raw_bytes = (
          self._slide_frame_cache.get_cached_externally_acquired_bytes(
              cache_key
          )
      )
      if frame_raw_bytes is not None:
        return frame_raw_bytes
    frame_raw_bytes = self.dwi.get_frame_image(
        instance_path, instance_frame_index, transcoding
    )
    if self._slide_frame_cache is not None:
      self._slide_frame_cache.cache_externally_acquired_bytes(
          cache_key, frame_raw_bytes
      )
    return frame_raw_bytes

  def _get_frame_client_transcoding(
      self,
      instance: slide_level_map.Instance,
      frame_number: int,
  ) -> Optional[np.ndarray]:
    """Returns DICOM Frame using DICOM server, transcodes to raw on server.

    Args:
      instance: DICOM instance within level to return frame from.
      frame_number: Frame number within the instance to return.

    Returns:
      Tuple[numpy array containing frame data, bool indicating if data should
      be cached True in LRU or is already cached within the level]
    """
    compressed_bytes = self._get_cached_frame_bytes(
        instance,
        frame_number,
    )
    if compressed_bytes is None:
      compressed_bytes = self._get_frame_bytes_from_server(
          instance,
          frame_number,
          dicom_web_interface.TranscodeDicomFrame.DO_NOT_TRANSCODE,
      )
    return dicom_frame_decoder.decode_dicom_compressed_frame_bytes(
        compressed_bytes,
        instance.dicom_object.get_value(tags.TRANSFER_SYNTAX_UID),
    )

  def _get_frame_server_transcoding(
      self,
      level: slide_level_map.Level,
      instance: slide_level_map.Instance,
      frame_number: int,
  ) -> np.ndarray:
    """Returns DICOM Frame using DICOM server, transcodes to raw on server.

    Args:
      level: WSI pyramid level.
      instance: DICOM instance within level to return frame from.
      frame_number: Frame number within the instance to return.

    Returns:
      Tuple[numpy array containing frame data, bool indicating if data should
      be cached True in LRU or is already cached within the level]
    """
    # Only use cache if frame bytes are stored a uncompressed little endian.
    if (
        level.transfer_syntax_uid
        not in _SUPPORTED_CLIENT_SIDE_DECODING_RAW_TRANSFER_SYNTAXS
    ):
      frame_raw_bytes = None
    else:
      frame_raw_bytes = self._get_cached_frame_bytes(
          instance,
          frame_number,
      )
    if frame_raw_bytes is None:
      frame_raw_bytes = self._get_frame_bytes_from_server(
          instance,
          frame_number,
          dicom_web_interface.TranscodeDicomFrame.UNCOMPRESSED_LITTLE_ENDIAN,
      )
    # crop bytes if DICOM buffer is padded.
    max_size = int(
        level.frame_height
        * level.frame_width
        * level.samples_per_pixel
        * np.dtype(level.pixel_format).itemsize
    )
    if len(frame_raw_bytes) > max_size:
      frame_raw_bytes = frame_raw_bytes[:max_size]
    return np.frombuffer(frame_raw_bytes, level.pixel_format).reshape(
        (level.frame_height, level.frame_width, level.samples_per_pixel)
    )

  def get_frame(
      self,
      level: slide_level_map.Level,
      frame_number: int,
  ) -> Optional[Frame]:
    """Gets a frame at a specific pixel_spacing in mm.

    The DICOMWeb API serves image pixels by the unit of frames. Frames have
    fixed size (width and height). Call get_patch() instead if you want to get
    an image patch at a specific loation, or with a specific dimension.

    The function utilizes a LRUCache to cache the most recent used frames.

    Args:
      level: source pyramid level for frame imaging if not defined pyramid level
        is deriveved using pixel_spacing.
      frame_number: The frame number to be fetched. The frames are stored in
        arrays with 1-based indexing.

    Returns:
      Returns the requested frame if exists, None otherwise.

    Raises:
      InputFrameNumberOutOfRangeError if the input frame_number is
      out of range.
    """
    if (
        frame_number < level.frame_number_min
        or frame_number > level.frame_number_max
    ):
      raise ez_wsi_errors.InputFrameNumberOutOfRangeError(
          f'frame_number value [{frame_number}] is out of range: '
          f'[{level.frame_number_min}, {level.frame_number_max}]'
      )
    instance = level.get_instance_by_frame(frame_number)
    if instance is None:
      # A frame may not exist in DICOMStore if it contains no tissue pixels.
      return None
    instance_frame_number = (
        instance.instance_frame_number_from_wholes_slide_frame_number(
            frame_number
        )
    )
    pos_x, pos_y = level.get_frame_position(frame_number)
    frame_ndarray = None
    if (
        self._enable_client_slide_frame_decompression
        and dicom_frame_decoder.can_decompress_dicom_transfer_syntax(
            level.transfer_syntax_uid
        )
    ):
      frame_ndarray = self._get_frame_client_transcoding(
          instance, instance_frame_number
      )
      # if frame_ndarray == None unable to decode bytes, likely cause is actual
      # pixel encoding does not match DICOM transfer syntax. Fail over to server
      # side transcoding.
    if frame_ndarray is None:
      frame_ndarray = self._get_frame_server_transcoding(
          level, instance, instance_frame_number
      )
    frame = Frame(
        x_origin=pos_x,
        y_origin=pos_y,
        width=level.frame_width,
        height=level.frame_height,
        image_np=frame_ndarray,
    )
    return frame

  def get_image(
      self,
      pixel_spacing: Union[
          slide_level_map.Level,
          slide_level_map.ResizedLevel,
      ],
  ) -> DicomImage:
    """Gets an image from a specific pixel spacing."""
    return DicomImage(pixel_spacing, self)

  def get_patch(
      self,
      level: Union[
          slide_level_map.Level,
          slide_level_map.ResizedLevel,
      ],
      x: int,
      y: int,
      width: int,
      height: int,
      require_fully_in_source_image: bool = False,
  ) -> DicomPatch:
    """Gets a patch from a specific slide level.

    The area of a patch is defined by its position(x, y) and its dimension
    (width, height). The position of a patch corresponds to the first pixel in
    the patch and has the smallest coordinates. All coordinates(x, y, width and
    height) are defined in the pixel space of the requested pixel spacing.

    This routine creates the requested patch on-the-fly, by sampling image
    pixels from all frames that are overlapped with the patch.

    Args:
      level: Level to generate patch from.
      x: The X coordinate of the patch position in level.
      y: The Y coordinate of the patch position in level.
      width: The width of the patch in level.
      height: The height of the patch in level.
      require_fully_in_source_image: Require patch dimensions fully exist in
        level.

    Returns:
      Returns the requested patch.
    """
    if isinstance(level, slide_level_map.ResizedLevel):
      return DicomPatch(
          level.source_level,
          x,
          y,
          width,
          height,
          self,
          level,
          require_fully_in_source_image=require_fully_in_source_image,
      )
    return DicomPatch(
        level,
        x,
        y,
        width,
        height,
        self,
        level,
        require_fully_in_source_image=require_fully_in_source_image,
    )

  @abc.abstractmethod
  def get_level_by_index(
      self, index: slide_level_map.LevelIndexType
  ) -> Optional[slide_level_map.Level]:
    """Returns the level by requested level index."""

  def has_level(self, level: slide_level_map.Level) -> bool:
    return level is self.get_level_by_index(level.level_index)

  @abc.abstractmethod
  def are_instances_concatenated(self, instance_uids: list[str]) -> bool:
    """Returns True if the instances provided are concatenated."""

  @property
  @abc.abstractmethod
  def all_levels(self) -> Iterator[slide_level_map.Level]:
    """return all levels of series."""

  def get_instance_level(
      self, sop_instance_uid: str
  ) -> Optional[slide_level_map.Level]:
    for level in self.all_levels:
      for instance in level.instances.values():
        if (
            instance.dicom_object.get_value(tags.SOP_INSTANCE_UID)
            == sop_instance_uid
        ):
          return level
    return None

  def _filter_dicom_object(
      self,
      instance: dicom_web_interface.DicomObject,
      slide_instances_uid_set: set[str],
      filter_by_annotation_iod: Optional[str],
      filter_by_operator_id: Optional[str],
  ) -> bool:
    """Filters instances by annotation IOD, referenced series UID, and operator ID."""

    ds = pydicom.Dataset.from_json(instance.dicom_tags)

    # Filter by provided IOD.
    if filter_by_annotation_iod and ds.SOPClassUID != filter_by_annotation_iod:
      return False

    # Check if instance is referencing the slide's series or one if its
    # instances.
    reference_found = False

    if ds.SeriesInstanceUID == self.path.series_uid:
      reference_found = True

    try:
      # https://dicom.innolitics.com/ciods/vl-whole-slide-microscopy-image/common-instance-reference/00081115/0020000e
      if (
          not reference_found
          and ds.ReferencedSeriesSequence[0].SeriesInstanceUID
          == self.path.series_uid
      ):
        reference_found = True
    except AttributeError:
      pass
    try:
      # https://dicom.innolitics.com/ciods/microscopy-bulk-simple-annotations/microscopy-bulk-simple-annotations/00081140/00081155
      if (
          not reference_found
          and ds.ReferencedImageSequence[0].ReferencedSOPInstanceUID
          in slide_instances_uid_set
      ):
        reference_found = True
    except AttributeError:
      pass

    if not reference_found:
      return False

    # Filter by operator ID.
    try:
      if not filter_by_operator_id:
        return True
      for code in ds.OperatorIdentificationSequence:
        for code in code.PersonIdentificationCodeSequence:
          if code.LongCodeValue == filter_by_operator_id:
            return True
    except AttributeError:
      pass

    return False

  def _find_annotation_instances(
      self,
      levels: Iterator[slide_level_map.Level],
      annotation_dicom_store: Optional[dicom_path.Path] = None,
      filter_by_annotation_iod: Optional[
          str
      ] = MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_STORAGE,
      filter_by_operator_id: Optional[str] = None,
  ) -> Iterator[dicom_path.Path]:
    """Returns iterator that contains all of a slide's annotation instances.

    The annotation instances much either reference the slide's series UID or
    have the same series UID as the slide.

    Args:
      levels: Images to search for annotation instances.
      annotation_dicom_store: The DICOM store to search for annotation
        instances. We assume here that the study UID stays the same between the
        slide and annotation DICOM stores.
      filter_by_annotation_iod: The annotation IOD to filter by. Default is
        Microscopy-Bulk-Simple-Annotations.
        https://dicom.nema.org/medical/dicom/current/output/chtml/part04/sect_b.5.html
      filter_by_operator_id: The operator ID to filter by. This searches the
        Operator Identification Sequence long code value for the provided ID.
        https://dicom.innolitics.com/ciods/vl-whole-slide-microscopy-image/general-series/00081072/00401101/00080119

    Returns:
      An iterator that contains all of a slide's annotation instances.
    """
    if annotation_dicom_store is None:
      annotation_dicom_store = self.path.GetStorePath()

    if annotation_dicom_store is not None:
      # Update study path to match annotation store.
      study_path = dicom_path.FromPath(
          annotation_dicom_store,
          study_uid=self.path.study_uid,
          series_uid='',
          instance_uid='',
      )
    else:
      # Assume annotations are stored in the same dicom store as the images.
      study_path = self.path.GetStudyPath()

    # Get all instances in the study in the provided DICOM store.
    # Make sure instance is an annotation modality.
    # https://www.dicomlibrary.com/dicom/modality
    dicom_instances = self.dwi.get_instances(study_path, modality='ANN')

    # Get all instances of this slide.
    slide_instances_set_uid = set[str]()
    for level in levels:
      for instance in level.instances.values():
        slide_instances_set_uid.add(
            instance.dicom_object.get_value(tags.SOP_INSTANCE_UID)
        )

    # Keep only relevant instances and transform to path iterator.
    return iter(
        dicom_path.FromPath(
            study_path,
            study_uid=instance.get_value(tags.STUDY_INSTANCE_UID),
            series_uid=instance.get_value(tags.SERIES_INSTANCE_UID),
            instance_uid=instance.get_value(tags.SOP_INSTANCE_UID),
        )
        for instance in dicom_instances
        if self._filter_dicom_object(
            instance,
            slide_instances_set_uid,
            filter_by_annotation_iod,
            filter_by_operator_id,
        )
    )


class DicomSlide(_DicomSeries):
  """Represents a DICOM pathology slide stored in a DICOMStore."""

  def __init__(
      self,
      dwi: dicom_web_interface.DicomWebInterface,
      path: dicom_path.Path,
      enable_client_slide_frame_decompression: bool = True,
      accession_number: Optional[str] = None,
      pixel_spacing_diff_tolerance: float = pixel_spacing_module.PIXEL_SPACING_DIFF_TOLERANCE,
      logging_factory: Optional[
          ez_wsi_logging_factory.AbstractLoggingInterfaceFactory
      ] = None,
      slide_frame_cache: Optional[
          local_dicom_slide_cache.InMemoryDicomSlideCache
      ] = None,
      json_metadata: Union[str, Mapping[str, Any]] = '',
      instances: Optional[Collection[dicom_web_interface.DicomObject]] = None,
  ):
    """Constructor.

    Args:
      dwi: The DicomWebInterface that has been configured to be able to access
        the series referenced by the input path.
      path: The path to a DICOM series object in a DICOMStore.
      enable_client_slide_frame_decompression: Set to True to enable client side
        frame decompression optimization (Recommended value = True); remove
        parameter following GG validation.
      accession_number: The accession_number of the slide.
      pixel_spacing_diff_tolerance: The tolerance (percentage difference) for
        difference between row and column pixel spacings.
      logging_factory: The factory that EZ WSI uses to construct a logging
        interface.
      slide_frame_cache: Initialize to use shared slide cache.
      json_metadata: Optional json formatted slide level metadata.
      instances: Optional list of instances to use to initialize the slide.
    """
    path = path.GetSeriesPath()
    super().__init__(
        dwi,
        path,
        enable_client_slide_frame_decompression,
        accession_number,
        logging_factory,
        slide_frame_cache,
    )
    if json_metadata:
      # if metadata is defined convert string to json.
      if isinstance(json_metadata, str):
        try:
          json_metadata = json.loads(json_metadata)
        except json.JSONDecodeError as exp:
          raise ez_wsi_errors.InvalidSlideJsonMetadataError(
              'Error decoding JSON metadata.'
          ) from exp
    if json_metadata:  # Ignore empty JSON metadata.
      try:
        if str(path) != str(
            dicom_path.Path.from_dict(json_metadata[_SLIDE_PATH])
        ):
          raise ez_wsi_errors.SlidePathDoesNotMatchJsonMetadataError(
              'Slide path does not match slide path in json metadata.'
          )
        self._level_map = slide_level_map.SlideLevelMap.create_from_json(
            json_metadata[LEVEL_MAP],
            pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
        )
      except (TypeError, IndexError, KeyError) as exp:
        raise ez_wsi_errors.InvalidSlideJsonMetadataError(
            'Incorrectly formatted JSON metadata.'
        ) from exp
    else:
      self._level_map = slide_level_map.SlideLevelMap(
          dwi.get_instances(path) if instances is None else instances,
          pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
      )
    self._native_level = _get_native_level(self._level_map)

  def __copy__(self) -> DicomSlide:
    instance = DicomSlide.__new__(DicomSlide)
    vars(instance).update(vars(self))
    instance._level_map = copy.copy(self._level_map)
    return instance

  @property
  def native_level(self) -> slide_level_map.Level:
    if self._native_level is None:
      raise ez_wsi_errors.LevelNotFoundError(
          'Slide does not define pyramid imaging.'
      )
    return self._native_level

  @property
  def total_pixel_matrix_columns(self) -> int:
    return self.native_level.width

  @property
  def total_pixel_matrix_rows(self) -> int:
    return self.native_level.height

  @property
  def pixel_format(self) -> np.dtype:
    return self.native_level.pixel_format

  @property
  def native_pixel_spacing(self) -> pixel_spacing_module.PixelSpacing:
    return self.native_level.pixel_spacing

  def set_icc_profile_bytes(self, icc_profile_bytes: bytes) -> None:
    """Sets ICC Profile bytes for pyramid."""
    self._level_map.set_icc_profile_bytes(icc_profile_bytes)

  def get_json_encoded_icc_profile_size(self) -> int:
    return self._level_map.get_json_encoded_icc_profile_size()

  def get_icc_profile_bytes(self) -> bytes:
    """Returns ICC Profile bytes for pyramid."""
    return self._level_map.get_icc_profile_bytes(self._dwi)

  def are_instances_concatenated(self, instance_uids: list[str]) -> bool:
    """Returns True if the instances provided are concatenated.

    This also indicates if the instances are of the same pixel spacing.

    Args:
      instance_uids: A list of SOP Instance UIDs to check

    Returns:
      True if the instances are concatenated or if only one instance uid is
        provided. Otherwise returns False.
    """
    return self._level_map.are_instances_concatenated(instance_uids)

  def get_patch_bounds_dicom_instance_frame_numbers(
      self,
      image_level: Union[
          slide_level_map.Level,
          slide_level_map.ResizedLevel,
          pixel_spacing_module.PixelSpacing,
      ],
      patch_bounds_list: List[PatchBounds],
  ) -> Mapping[str, List[int]]:
    """Returns Map[DICOM instances: frame numbers] that fall in patch bounds.

    Args:
      image_level: Level that patch represents.
      patch_bounds_list: List of PatchBounds to return frame indexes for.

    Returns:
      Mapping between DICOM instances, path, and list of frames numbers required
      to render patches.
    """
    if isinstance(image_level, pixel_spacing_module.PixelSpacing):
      return super().get_patch_bounds_dicom_instance_frame_numbers(
          _pixel_spacing_to_level(self, image_level), patch_bounds_list
      )
    else:
      return super().get_patch_bounds_dicom_instance_frame_numbers(
          image_level, patch_bounds_list
      )

  def create_icc_profile_transformation(
      self,
      icc_profile: Union[bytes, ImageCms.core.CmsProfile],
      rendering_intent: ImageCms.Intent = ImageCms.Intent.PERCEPTUAL,
  ) -> Optional[ImageCms.ImageCmsTransform]:
    """Returns transformation to from pyramid colorspace to icc_profile.

    Args:
      icc_profile: ICC Profile to DICOM Pyramid imaging to.
      rendering_intent: Rendering intent to use in transformation.

    Returns:
      PIL.ImageCmsTransformation to transform pixel imaging or None.
    """
    return create_icc_profile_transformation(
        self.get_icc_profile_bytes(), icc_profile, rendering_intent
    )

  def get_image(
      self,
      pixel_spacing: Union[
          slide_level_map.Level,
          slide_level_map.ResizedLevel,
          pixel_spacing_module.PixelSpacing,
      ],
  ) -> DicomImage:
    """Gets an image from a specific pixel spacing."""
    if isinstance(pixel_spacing, pixel_spacing_module.PixelSpacing):
      return DicomImage(_pixel_spacing_to_level(self, pixel_spacing), self)
    return DicomImage(pixel_spacing, self)

  def get_patch(
      self,
      level: Union[
          slide_level_map.Level,
          slide_level_map.ResizedLevel,
          pixel_spacing_module.PixelSpacing,
      ],
      x: int,
      y: int,
      width: int,
      height: int,
      require_fully_in_source_image: bool = False,
  ) -> DicomPatch:
    if isinstance(level, pixel_spacing_module.PixelSpacing):
      level = _pixel_spacing_to_level(self, level)
    return super().get_patch(
        level,
        x,
        y,
        width,
        height,
        require_fully_in_source_image=require_fully_in_source_image,
    )

  def get_frame(
      self,
      level: Union[slide_level_map.Level, pixel_spacing_module.PixelSpacing],
      frame_number: int,
  ) -> Optional[Frame]:
    """Gets a frame at a specific pixel_spacing in mm.

    The DICOMWeb API serves image pixels by the unit of frames. Frames have
    fixed size (width and height). Call get_patch() instead if you want to get
    an image patch at a specific loation, or with a specific dimension.

    The function utilizes a LRUCache to cache the most recent used frames.

    Args:
      level: source pyramid level for frame imaging if not defined pyramid level
        is deriveved using pixel_spacing.
      frame_number: The frame number to be fetched. The frames are stored in
        arrays with 1-based indexing.

    Returns:
      Returns the requested frame if exists, None otherwise.

    Raises:
      InputFrameNumberOutOfRangeError if the input frame_number is
      out of range.
    """
    if isinstance(level, pixel_spacing_module.PixelSpacing):
      return super().get_frame(
          _pixel_spacing_to_level(self, level), frame_number
      )
    return super().get_frame(level, frame_number)

  def json_metadata_dict(
      self,
      level_subset: Optional[List[slide_level_map.Level]] = None,
      max_json_encoded_icc_profile_size: int = slide_level_map.DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES,
  ) -> Mapping[str, Any]:
    return {
        _SOP_CLASS_UID: (
            slide_level_map.VL_WHOLE_SLIDE_MICROSCOPY_IMAGE_SOP_CLASS_UID
        ),
        _SLIDE_PATH: self.path.to_dict(),
        LEVEL_MAP: self._level_map.to_dict(
            level_subset=level_subset,
            max_json_encoded_icc_profile_size=max_json_encoded_icc_profile_size,
        ),
    }

  def __eq__(self, other: Any) -> bool:
    if isinstance(other, DicomSlide):
      return str(self.path) == str(other.path)
    return False

  @property
  def all_pixel_spacing_mms(self) -> list[float]:
    """Lists all Pixel Spacings in mm in the DicomSlide."""
    return [
        level.pixel_spacing.pixel_spacing_mm
        for level in self._level_map.level_map.values()
        if level.pixel_spacing.is_defined
    ]

  def get_level_by_pixel_spacing(
      self,
      pixel_spacing: pixel_spacing_module.PixelSpacing,
      relative_pixel_spacing_equality_threshold: float = slide_level_map.MAX_LEVEL_DIST,
      maximum_downsample: float = 0.0,
  ) -> Optional[slide_level_map.Level]:
    """Gets the level corresponding to the input pixel spacing.

    Args:
      pixel_spacing: The pixel spacing to use for level lookup.
      relative_pixel_spacing_equality_threshold: Maximum relative difference in
        (pyramid / level pixel spacing / desired pixel spacing) at which pixel
        spacing should be considered equilvalent.  E.g., (value of 0.25 =
        pyramid levels with pixels spacings that are 25% smaller - 25% larger
        are considered equlivalent). Pathology imaging is typically represented
        as a pyramid with pyramid levels being 200% or greater relative
        magnifications of each other.
      maximum_downsample: Maximum degree to which it is acceptable to downsample
        pixel imaging to acchieve a desired target pixel spacing. Only used when
        source imaging pixel spacing is < and != to desired target pixel
        spacing.

    Returns:
      The level corresponding to the input pixel spacing. None if the requested
      pixel spacing does not exist.
    """
    # Converts pixel spacing returned by Magnification.NominalPixelSize() from
    # micrometers to millimeters.
    return self._level_map.get_level_by_pixel_spacing(
        pixel_spacing.pixel_spacing_mm,
        relative_pixel_spacing_equality_threshold=relative_pixel_spacing_equality_threshold,
        maximum_downsample=maximum_downsample,
    )

  def get_closest_level_with_pixel_spacing_equal_or_less_than_target(
      self,
      pixel_spacing: pixel_spacing_module.PixelSpacing,
  ) -> Optional[slide_level_map.Level]:
    """Gets the level corresponding to the input pixel spacing.

    Args:
      pixel_spacing: The pixel spacing to use for level lookup.

    Returns:
      The level corresponding to the input pixel spacing. None if the requested
      pixel spacing does not exist.
    """
    # Converts pixel spacing returned by Magnification.NominalPixelSize() from
    # micrometers to millimeters.
    return self._level_map.get_levels_with_closest_pixel_spacing(
        pixel_spacing.pixel_spacing_mm
    ).equal_or_smaller_pixel_spacing

  def get_closest_level_with_pixel_spacing_greater_than_target(
      self,
      pixel_spacing: pixel_spacing_module.PixelSpacing,
  ) -> Optional[slide_level_map.Level]:
    """Gets the level corresponding to the input pixel spacing.

    Args:
      pixel_spacing: The pixel spacing to use for level lookup.

    Returns:
      The level corresponding to the input pixel spacing. None if the requested
      pixel spacing does not exist.
    """
    # Converts pixel spacing returned by Magnification.NominalPixelSize() from
    # micrometers to millimeters.
    return self._level_map.get_levels_with_closest_pixel_spacing(
        pixel_spacing.pixel_spacing_mm
    ).greater_pixel_spacing

  def get_instance_pixel_spacing(
      self, instance_uid: str
  ) -> Optional[pixel_spacing_module.PixelSpacing]:
    """Given an Instance UID retrieves its corresponding PixelSpacing.

    Args:
      instance_uid: A SOP Instance UID to find

    Returns:
      PixelSpacing of the instance. Raises if no match

    Raises:
      PixelSpacingNotFoundForInstanceError if the instance is not in the level
      map.
    """
    return self._level_map.get_instance_pixel_spacing(instance_uid)

  @property
  def thumbnail(self) -> Optional[slide_level_map.Level]:
    return self._level_map.thumbnail

  @property
  def label(self) -> Optional[slide_level_map.Level]:
    return self._level_map.label

  @property
  def overview(self) -> Optional[slide_level_map.Level]:
    return self._level_map.overview

  @property
  def levels(self) -> Iterator[slide_level_map.Level]:
    """Returns iterator that contains all of a slide's DICOM Levels."""
    return iter(self._level_map.level_map.values())

  def get_level_by_index(
      self, index: slide_level_map.LevelIndexType
  ) -> Optional[slide_level_map.Level]:
    return self._level_map.get_level(index)

  @property
  def wsi_label_overview_thumbnail_levels(
      self,
  ) -> Iterator[slide_level_map.Level]:
    return iter([
        lvl
        for lvl in [self.thumbnail, self.label, self.overview]
        if lvl is not None
    ])

  @property
  def all_levels(self) -> Iterator[slide_level_map.Level]:
    return itertools.chain(
        self.levels,
        self.wsi_label_overview_thumbnail_levels,
    )

  def _filter_dicom_object(
      self,
      instance: dicom_web_interface.DicomObject,
      slide_instances_uid_set: set[str],
      filter_by_annotation_iod: Optional[str],
      filter_by_operator_id: Optional[str],
  ) -> bool:
    """Filters instances by annotation IOD, referenced series UID, and operator ID."""

    ds = pydicom.Dataset.from_json(instance.dicom_tags)

    # Filter by provided IOD.
    if filter_by_annotation_iod and ds.SOPClassUID != filter_by_annotation_iod:
      return False

    # Check if instance is referencing the slide's series or one if its
    # instances.
    reference_found = False

    if ds.SeriesInstanceUID == self.path.series_uid:
      reference_found = True

    try:
      # https://dicom.innolitics.com/ciods/vl-whole-slide-microscopy-image/common-instance-reference/00081115/0020000e
      if (
          not reference_found
          and ds.ReferencedSeriesSequence[0].SeriesInstanceUID
          == self.path.series_uid
      ):
        reference_found = True
    except AttributeError:
      pass
    try:
      # https://dicom.innolitics.com/ciods/microscopy-bulk-simple-annotations/microscopy-bulk-simple-annotations/00081140/00081155
      if (
          not reference_found
          and ds.ReferencedImageSequence[0].ReferencedSOPInstanceUID
          in slide_instances_uid_set
      ):
        reference_found = True
    except AttributeError:
      pass

    if not reference_found:
      return False

    # Filter by operator ID.
    try:
      if not filter_by_operator_id:
        return True
      for code in ds.OperatorIdentificationSequence:
        for code in code.PersonIdentificationCodeSequence:
          if code.LongCodeValue == filter_by_operator_id:
            return True
    except AttributeError:
      pass

    return False

  def find_annotation_instances(
      self,
      annotation_dicom_store: Optional[dicom_path.Path] = None,
      filter_by_annotation_iod: Optional[
          str
      ] = MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_STORAGE,
      filter_by_operator_id: Optional[str] = None,
  ) -> Iterator[dicom_path.Path]:
    """Returns iterator that contains all of a slide's annotation instances.

    The annotation instances much either reference the slide's series UID or
    have the same series UID as the slide.

    Args:
      annotation_dicom_store: The DICOM store to search for annotation
        instances. We assume here that the study/series UID stays the same
        between the slide and annotation DICOM stores.
      filter_by_annotation_iod: The annotation IOD to filter by. Default is
        Microscopy-Bulk-Simple-Annotations.
        https://dicom.nema.org/medical/dicom/current/output/chtml/part04/sect_b.5.html
      filter_by_operator_id: The operator ID to filter by. This searches the
        Operator Identification Sequence long code value for the provided ID.
        https://dicom.innolitics.com/ciods/vl-whole-slide-microscopy-image/general-series/00081072/00401101/00080119

    Returns:
      An iterator that contains all of a slide's annotation instances.
    """
    return self._find_annotation_instances(
        self.levels,
        annotation_dicom_store,
        filter_by_annotation_iod,
        filter_by_operator_id,
    )


class DicomMicroscopeImage(_DicomSeries):
  """Represents a Non-tiled DICOM pathology slide stored in a DICOMStore."""

  def __init__(
      self,
      dwi: dicom_web_interface.DicomWebInterface,
      path: dicom_path.Path,
      enable_client_slide_frame_decompression: bool = True,
      accession_number: Optional[str] = None,
      pixel_spacing_diff_tolerance: float = pixel_spacing_module.PIXEL_SPACING_DIFF_TOLERANCE,
      logging_factory: Optional[
          ez_wsi_logging_factory.AbstractLoggingInterfaceFactory
      ] = None,
      slide_frame_cache: Optional[
          local_dicom_slide_cache.InMemoryDicomSlideCache
      ] = None,
      json_metadata: Union[str, Mapping[str, Any]] = '',
      instances: Optional[Collection[dicom_web_interface.DicomObject]] = None,
  ):
    """Constructor.

    Args:
      dwi: The DicomWebInterface that has been configured to be able to access
        the series referenced by the input path.
      path: The path to a DICOM series object in a DICOMStore.
      enable_client_slide_frame_decompression: Set to True to enable client side
        frame decompression optimization (Recommended value = True); remove
        parameter following GG validation.
      accession_number: The accession_number of the slide.
      pixel_spacing_diff_tolerance: The tolerance (percentage difference) for
        difference between row and column pixel spacings.
      logging_factory: The factory that EZ WSI uses to construct a logging
        interface.
      slide_frame_cache: Initialize to use shared slide cache.
      json_metadata: Optional json formatted slide level metadata.
      instances: Optional list of instances to use to initialize the slide.
    """
    super().__init__(
        dwi,
        path,
        enable_client_slide_frame_decompression,
        accession_number,
        logging_factory,
        slide_frame_cache,
    )
    if json_metadata:  # Ignore empty JSON metadata.
      # if metadata is defined convert string to json.
      if isinstance(json_metadata, str):
        try:
          json_metadata = json.loads(json_metadata)
        except json.JSONDecodeError as exp:
          raise ez_wsi_errors.InvalidSlideJsonMetadataError(
              'Error decoding JSON metadata.'
          ) from exp
      try:
        if str(path) != str(
            dicom_path.Path.from_dict(json_metadata[_SLIDE_PATH])
        ):
          raise ez_wsi_errors.SlidePathDoesNotMatchJsonMetadataError(
              'Slide path does not match slide path in json metadata.'
          )
        self._non_tiled_levels = (
            slide_level_map.UntiledImageMap.create_from_json(
                json_metadata[UNTILED_MICROSCOPE_IMAGE_MAP],
                pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
            )
        )
      except (TypeError, IndexError, KeyError) as exp:
        raise ez_wsi_errors.InvalidSlideJsonMetadataError(
            'Incorrectly formatted JSON metadata.'
        ) from exp
    else:
      self._non_tiled_levels = slide_level_map.UntiledImageMap(
          dwi.get_instances(path) if instances is None else instances,
          pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
      )

  def __copy__(self) -> DicomMicroscopeImage:
    instance = DicomMicroscopeImage.__new__(DicomMicroscopeImage)
    vars(instance).update(vars(self))
    instance._non_tiled_levels = copy.copy(self._non_tiled_levels)
    return instance

  def json_metadata_dict(
      self,
      level_subset: Optional[List[slide_level_map.Level]] = None,
      max_json_encoded_icc_profile_size: int = slide_level_map.DEFAULT_MAX_JSON_ENCODED_ICC_PROFILE_SIZE_IN_BYTES,
  ) -> Mapping[str, Any]:
    """Returns json formatted slide level metadata.

    Args:
      level_subset: List of levels defined on the slide to selectively export
        metadata for.
      max_json_encoded_icc_profile_size: Max size of JSON to return; not used.

    Returns
      Json formatted metadata.

    Raises:
      ez_wsi_errors.LevelNotFoundError: Level not defined on Slide.
    """
    del max_json_encoded_icc_profile_size
    return {
        _SOP_CLASS_UID: slide_level_map.VL_MIROSCOPIC_IMAGE_SOP_CLASS_UID,
        _SLIDE_PATH: self.path.to_dict(),
        UNTILED_MICROSCOPE_IMAGE_MAP: self._non_tiled_levels.to_dict(
            level_subset=level_subset,
        ),
    }

  def __eq__(self, other: Any) -> bool:
    if isinstance(other, DicomMicroscopeImage):
      return str(self.path) == str(other.path)
    return False

  def are_instances_concatenated(self, instance_uids: list[str]) -> bool:
    del instance_uids
    return False

  @property
  def levels(self) -> Iterator[slide_level_map.Level]:
    """Returns iterator that contains all of a slide's DICOM Levels."""
    return iter(self._non_tiled_levels.untiled_image_map.values())

  def get_level_by_index(
      self, index: slide_level_map.LevelIndexType
  ) -> Optional[slide_level_map.Level]:
    return self._non_tiled_levels.untiled_image_map.get(index, None)

  @property
  def all_levels(self) -> Iterator[slide_level_map.Level]:
    return self.levels

  def get_level_icc_profile_bytes(
      self, level: Union[slide_level_map.Level, slide_level_map.ResizedLevel]
  ) -> bytes:
    """Returns ICC Profile bytes for pyramid."""
    if isinstance(level, slide_level_map.ResizedLevel):
      level = level.source_level
    return self._non_tiled_levels.get_level_icc_profile_bytes(level, self._dwi)

  def find_annotation_instances(
      self,
      level: slide_level_map.Level,
      annotation_dicom_store: Optional[dicom_path.Path] = None,
      filter_by_annotation_iod: Optional[
          str
      ] = MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_STORAGE,
      filter_by_operator_id: Optional[str] = None,
  ) -> Iterator[dicom_path.Path]:
    """Returns iterator that contains all of a slide's annotation instances.

    The annotation instances much either reference the slide's series UID or
    have the same series UID as the slide.

    Args:
      level: The image to search for annotation instances.
      annotation_dicom_store: The DICOM store to search for annotation
        instances. We assume here that the study/series UID stays the same
        between the slide and annotation DICOM stores.
      filter_by_annotation_iod: The annotation IOD to filter by. Default is
        Microscopy-Bulk-Simple-Annotations.
        https://dicom.nema.org/medical/dicom/current/output/chtml/part04/sect_b.5.html
      filter_by_operator_id: The operator ID to filter by. This searches the
        Operator Identification Sequence long code value for the provided ID.
        https://dicom.innolitics.com/ciods/vl-whole-slide-microscopy-image/general-series/00081072/00401101/00080119

    Returns:
      An iterator that contains all of a slide's annotation instances.
    """
    return self._find_annotation_instances(
        iter([level]),
        annotation_dicom_store,
        filter_by_annotation_iod,
        filter_by_operator_id,
    )


def _add_sop_class_uid(
    sop_class_uid: str, sop_class_uid_found: Set[str]
) -> None:
  if (
      sop_class_uid
      == slide_level_map.VL_WHOLE_SLIDE_MICROSCOPY_IMAGE_SOP_CLASS_UID
      or sop_class_uid in slide_level_map.UNTILED_IMAGE_SOP_CLASS_UID
  ):
    sop_class_uid_found.add(sop_class_uid)


def _load_series(
    dwi: dicom_web_interface.DicomWebInterface,
    path: dicom_path.Path,
    enable_client_slide_frame_decompression: bool,
    json_metadata: Union[str, Mapping[str, Any]],
    instances: Optional[Collection[dicom_web_interface.DicomObject]],
    sop_class_uid_found: Set[str],
    pixel_spacing_diff_tolerance: float,
    logging_factory: Optional[
        ez_wsi_logging_factory.AbstractLoggingInterfaceFactory
    ],
) -> Tuple[Optional[DicomSlide], Optional[DicomMicroscopeImage]]:
  """Loads DicomSlide and/or DicomMicroscopeImage defined on a series."""
  if (
      slide_level_map.VL_WHOLE_SLIDE_MICROSCOPY_IMAGE_SOP_CLASS_UID
      in sop_class_uid_found
  ):
    vl_whole_slide_image = DicomSlide(
        dwi,
        path,
        enable_client_slide_frame_decompression,
        json_metadata=json_metadata,
        instances=instances,
        pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
        logging_factory=logging_factory,
    )
  else:
    vl_whole_slide_image = None
  if slide_level_map.UNTILED_IMAGE_SOP_CLASS_UID.intersection(
      sop_class_uid_found
  ):
    microscope_image = DicomMicroscopeImage(
        dwi,
        path,
        enable_client_slide_frame_decompression,
        json_metadata=json_metadata,
        instances=instances,
        pixel_spacing_diff_tolerance=pixel_spacing_diff_tolerance,
        logging_factory=logging_factory,
    )
  else:
    microscope_image = None
  return (vl_whole_slide_image, microscope_image)


class DicomMicroscopeSeries:
  """Returns Microscope Images Defined on a Series."""

  def __init__(
      self,
      dwi: dicom_web_interface.DicomWebInterface,
      path: dicom_path.Path,
      enable_client_slide_frame_decompression: bool = True,
      json_metadata: Union[str, Mapping[str, Any]] = '',
      instance_uids: Optional[list[str]] = None,
      pixel_spacing_diff_tolerance: float = pixel_spacing_module.PIXEL_SPACING_DIFF_TOLERANCE,
      logging_factory: Optional[
          ez_wsi_logging_factory.AbstractLoggingInterfaceFactory
      ] = None,
  ):
    if json_metadata:
      sop_class_uid_found = set()
      # if metadata is defined convert string to json.
      try:
        if isinstance(json_metadata, str):
          json_metadata = json.loads(json_metadata)
        sop_class_uid = json_metadata[_SOP_CLASS_UID]
        _add_sop_class_uid(sop_class_uid, sop_class_uid_found)
      except (json.JSONDecodeError, KeyError, TypeError, IndexError) as exp:
        raise ez_wsi_errors.InvalidSlideJsonMetadataError(
            'Error decoding JSON metadata.'
        ) from exp
      self.dicom_slide, self.dicom_microscope_image = _load_series(
          dwi,
          path,
          enable_client_slide_frame_decompression,
          json_metadata,
          None,
          sop_class_uid_found,
          pixel_spacing_diff_tolerance,
          logging_factory,
      )
      if self.dicom_slide is None and self.dicom_microscope_image is None:
        raise ez_wsi_errors.InvalidSlideJsonMetadataError(
            'Error decoding JSON metadata does not encode DICOM imaging.'
        )
      return
    dicom_instances = dwi.get_instances(path)
    json_metadata = {}
    sop_class_uid_found = set()
    for instance in dicom_instances:
      if (
          instance_uids is None
          or instance.get_value(tags.SOP_INSTANCE_UID) in instance_uids
      ):
        _add_sop_class_uid(
            instance.get_value(tags.SOP_CLASS_UID),
            sop_class_uid_found,
        )
    self.dicom_slide, self.dicom_microscope_image = _load_series(
        dwi,
        path,
        enable_client_slide_frame_decompression,
        json_metadata,
        dicom_instances,
        sop_class_uid_found,
        pixel_spacing_diff_tolerance,
        logging_factory,
    )
