pathology/dicom_proxy/annotations_util.py (810 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flask blueprint for Annotations using the DICOM Proxy."""
from concurrent import futures
import dataclasses
import http
import io
import json
import os
import re
import tempfile
from typing import Any, IO, List, Mapping, MutableMapping, Optional, Set, Union
from xml.etree import ElementTree as ET
import flask
import pydicom
import requests
import requests_toolbelt
from pathology.dicom_proxy import dicom_proxy_flags
from pathology.dicom_proxy import dicom_store_util
from pathology.dicom_proxy import dicom_tag_util
from pathology.dicom_proxy import dicom_url_util
from pathology.dicom_proxy import flask_util
from pathology.dicom_proxy import metadata_util
from pathology.dicom_proxy import proxy_const
from pathology.dicom_proxy import user_auth_util
from pathology.shared_libs.logging_lib import cloud_logging_client
from pathology.shared_libs.pydicom_version_util import pydicom_version_util
# Constants
_OPERATOR_IDENTIFICATION_SEQUENCE_TAG = dicom_tag_util.DicomTag(
'00081072', 'OperatorIdentificationSequence'
)
_PERSON_IDENTIFICATION_CODE_SEQUENCE_TAG = dicom_tag_util.DicomTag(
'00401101', 'PersonIdentificationCodeSequence'
)
_LONG_CODE_VALUE_TAG = dicom_tag_util.DicomTag('00080119', 'LongCodeValue')
_CODE_MEANING_TAG = dicom_tag_util.DicomTag('00080104', 'CodeMeaning')
_MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_IOD = '1.2.840.10008.5.1.4.1.1.91.1'
_EMAIL_VALIDATION_REGEX = re.compile(r'.+@.+\..+')
_ACCEPT = 'Accept'
_APPLICATION_DICOM_JSON = 'application/dicom+json'
_APPLICATION_DICOM = 'application/dicom'
_APPLICATION_DICOM_XML = 'application/dicom+xml'
_TEXT_PLAIN = 'text/plain'
_MULTIPART_RELATED = 'multipart/related'
_UNRECOGNIZED_CONTENT_TYPE = 'unrecognized_content_type'
_CONTENT_TYPE = 'content-type'
_UNAUTHORIZED_CLIENT_ACCESS = 'Error cannot read from DICOM store.'
_GOOGLE_ACCOUNT_EMAIL_PREFIX = 'accounts.google.com:'
_DICOM_STORE_ID = re.compile(
r'(.*/)?projects/(.*?)/locations/(.*?)/datasets/(.*?)/dicomStores/(.*?)(/.*)?',
re.IGNORECASE,
)
_SOP_CLASS_UID_DICOM_TAG_ADDRESS = '00080016'
_SQ = 'SQ'
_VALUE = 'Value'
_VR = 'vr'
def _norm_dicom_store_url(store_url: str) -> str:
match = _DICOM_STORE_ID.fullmatch(store_url)
if not match:
cloud_logging_client.warning(
'Unexpected dicom store url formatting.', {'url': store_url}
)
return store_url.lower()
return (':'.join(match.groups()[1:5])).lower()
def _get_dicom_store_allow_set() -> Set[str]:
return {
_norm_dicom_store_url(url)
for url in dicom_proxy_flags.DICOM_ANNOTATIONS_STORE_ALLOW_LIST.value
}
@dataclasses.dataclass(frozen=True)
class _MpRequestPart:
headers: Mapping[bytes, bytes]
content: bytes
class _UnableToAuthenticateUserError(Exception):
"""Unable to authenticate user.
User does not have access or cannot be authenticated to read and write
to annotations DICOM Store.
"""
class _MultiPartContentSopClassUidDecodingError(Exception):
pass
class _InvalidDicomJsonError(Exception):
pass
class _ServiceAccountCredentials(user_auth_util.AuthSession):
"""Inits UserAuth credentials using the Proxy server service account."""
def __init__(self, dicom_web_base_url: dicom_url_util.DicomWebBaseURL):
"""Inits UserAuth credentials using the Proxy server service account.
Args:
dicom_web_base_url: Base URL of DICOM Annotations store for request.
Raises:
_UnableToAuthenticateUserError: If user cannot be authenticated.
"""
studies_query = (
f'{dicom_url_util._HEALTHCARE_API_URL}/{dicom_web_base_url}'
'/studies?limit=1'
)
try:
super().__init__(flask_util.get_headers())
base_log = {
proxy_const.LogKeywords.USER_EMAIL: self.email,
proxy_const.LogKeywords.EMAIL_REGEX: _EMAIL_VALIDATION_REGEX,
proxy_const.LogKeywords.AUTHORIZATION: self.authorization,
proxy_const.LogKeywords.AUTHORITY: self.authority,
}
response = requests.get(
studies_query,
headers=self.add_to_header({_ACCEPT: _APPLICATION_DICOM_JSON}),
stream=False,
)
try:
response.raise_for_status()
if self.email and re.fullmatch(
_EMAIL_VALIDATION_REGEX,
_normalize_email(self.email),
):
self._user_email = self.email
self._user_authorization = self.authorization
self._user_authority = self.authority
cloud_logging_client.info(
'Authenticated user has read access to annotation DICOM store.',
base_log,
)
self._init_to_service_account_credentials()
return
else:
cloud_logging_client.error(
'Could not authenticate user. User email empty or formattted'
' unexpectedly.',
base_log,
)
except requests.exceptions.HTTPError as exp:
cloud_logging_client.error(
'User does not have access or cannot be authenticated.',
base_log,
exp,
)
raise _UnableToAuthenticateUserError(
'User does not have access or cannot be authenticated.'
) from exp
except user_auth_util.UserEmailRetrievalError as exp:
cloud_logging_client.error(
'User does not have access or cannot be authenticated.',
{
proxy_const.LogKeywords.USER_EMAIL: self.email,
proxy_const.LogKeywords.EMAIL_REGEX: _EMAIL_VALIDATION_REGEX,
proxy_const.LogKeywords.AUTHORIZATION: self.authorization,
proxy_const.LogKeywords.AUTHORITY: self.authority,
},
exp,
)
raise _UnableToAuthenticateUserError(
'User does not have access or cannot be authenticated.'
) from exp
cloud_logging_client.error(
'User does not have access or cannot be authenticated.'
)
raise _UnableToAuthenticateUserError(
'User does not have access or cannot be authenticated.'
)
# Convenience methods to expose tokens, email.
#
# All http auth against DICOM store should rely on base class accessors in:
# self.email
# self.authorization
# self.authority
@property
def service_account_email(self) -> str:
return self.email
@property
def service_account_authorization(self) -> str:
return self.authorization
@property
def service_account_authority(self) -> str:
return self.authority
@property
def user_email(self) -> str:
return self._user_email
@property
def user_authorization(self) -> str:
return self._user_authorization
@property
def user_authority(self) -> str:
return self._user_authority
def _normalize_email(email: str) -> str:
"""Returns normalized email.
Converts email to lowercase, removes bounding white space, and removes
google account prefix that is added by IAP.
Args:
email: User email.
Returns:
Normalized email.
"""
google_prefix = _GOOGLE_ACCOUNT_EMAIL_PREFIX.lower()
email = email.strip().lower()
if email.startswith(google_prefix):
return email[len(google_prefix) :].strip()
return email
def _are_emails_different(email_1: str, email_2: str) -> bool:
return _normalize_email(email_1) != _normalize_email(email_2)
def _get_code_value(dataset: pydicom.dataset.Dataset) -> str:
if _LONG_CODE_VALUE_TAG.keyword in dataset:
return dataset.LongCodeValue
return dataset.CodeValue
def _set_code_value(dataset: pydicom.dataset.Dataset, value: str) -> None:
if len(value) <= 16:
dataset.CodeValue = value
else:
dataset.LongCodeValue = value
def _convert_creator_email_to_dicom_dataset(
user_email: str,
institution_name: str,
) -> pydicom.dataset.Dataset:
"""Converts user email to a DICOM OperatorIdentification dataset.
Args:
user_email: Email to store for user.
institution_name: Annotator's institution name.
Returns:
DICOM dataset
"""
operator_id = pydicom.dataset.Dataset()
operator_id.InstitutionName = institution_name
person_id = pydicom.dataset.Dataset()
email = _normalize_email(user_email)
_set_code_value(person_id, email)
person_id.CodeMeaning = 'Annotator Id'
person_id.CodingSchemeDesignator = '99Google'
operator_id.PersonIdentificationCodeSequence = [person_id]
return operator_id
def _verify_allowed_dicom_annotations_store_address(
url: dicom_url_util.DicomWebBaseURL,
) -> bool:
"""Verifies if the request is for an allowed Annotations DICOM Store.
Args:
url: Base DICOMweb URL for store.
Returns:
True if address is in allow list
"""
return _norm_dicom_store_url(str(url)) in _get_dicom_store_allow_set()
def _text_response(
msg: Union[str, bytes], status: http.HTTPStatus
) -> flask.Response:
return flask.Response(
msg, status=status, content_type=_TEXT_PLAIN, mimetype=_TEXT_PLAIN
)
def delete_instance(
dicom_web_base_url: dicom_url_util.DicomWebBaseURL,
study_instance_uid: str,
series_instance_uid: str,
sop_instance_uid: str,
) -> flask.Response:
"""Flask entry point for DICOMweb request to delete annotations.
Deletes all annotations for a user on a slide.
Args:
dicom_web_base_url: Base DICOMweb URL for store.
study_instance_uid: Study uid of instance to delete.
series_instance_uid: Series uid of instance to delete.
sop_instance_uid: Instance uid of instance to delete.
Returns:
flask.Response
"""
base_log = {
proxy_const.LogKeywords.BASE_DICOMWEB_URL: str(dicom_web_base_url),
proxy_const.LogKeywords.STUDY_INSTANCE_UID: study_instance_uid,
proxy_const.LogKeywords.SERIES_INSTANCE_UID: series_instance_uid,
proxy_const.LogKeywords.SOP_INSTANCE_UID: sop_instance_uid,
}
cloud_logging_client.info('Delete instance', base_log)
if not _verify_allowed_dicom_annotations_store_address(dicom_web_base_url):
# If DICOM store not in annotation list. Proxy delete request.
cloud_logging_client.debug(
f'DICOM Store {dicom_web_base_url} is not'
' allow-listed. Proxying delete request.',
{proxy_const.LogKeywords.DICOMWEB_URL: dicom_web_base_url},
base_log,
)
return dicom_store_util.dicom_store_proxy()
dicom_series_path = dicom_url_util.base_dicom_series_url(
dicom_web_base_url,
dicom_url_util.StudyInstanceUID(study_instance_uid),
dicom_url_util.SeriesInstanceUID(series_instance_uid),
)
dicom_instance_path = dicom_url_util.series_dicom_instance_url(
dicom_series_path, dicom_url_util.SOPInstanceUID(sop_instance_uid)
)
base_log[proxy_const.LogKeywords.DICOMWEB_URL] = dicom_instance_path
# Get DICOM instance metadata.
try:
instances_metadata = dicom_store_util.get_instance_tags(
user_auth_util.AuthSession(flask_util.get_headers()),
dicom_series_path,
dicom_url_util.SOPInstanceUID(sop_instance_uid),
additional_tags=[
_OPERATOR_IDENTIFICATION_SEQUENCE_TAG.keyword,
_PERSON_IDENTIFICATION_CODE_SEQUENCE_TAG.keyword,
_LONG_CODE_VALUE_TAG.keyword,
_CODE_MEANING_TAG.keyword,
],
)
except dicom_store_util.DicomMetadataRequestError:
instances_metadata = {}
if not instances_metadata:
# test if user has read access to DICOM store.
try:
_ServiceAccountCredentials(dicom_web_base_url)
except _UnableToAuthenticateUserError:
msg = _UNAUTHORIZED_CLIENT_ACCESS
cloud_logging_client.error(msg, base_log)
return _text_response(msg, http.HTTPStatus.UNAUTHORIZED)
# If user can read from store then metadata read failed because DICOM does
# not exist.
msg = 'DICOM instance does not exist.'
cloud_logging_client.warning(msg, base_log)
return _text_response(msg, http.HTTPStatus.NOT_FOUND)
# Read metadata using PyDICOM.
try:
dcm_metadata = pydicom.dataset.Dataset.from_json(
dict(instances_metadata[0])
)
except (pydicom.errors.InvalidDicomError, IndexError) as exp:
msg = 'Error decoding DICOM instance.'
cloud_logging_client.error(msg, base_log, exp)
return _text_response(msg, http.HTTPStatus.BAD_REQUEST)
try:
# Test if DICOM is Annotation if not proxy request.
if dcm_metadata.SOPClassUID != _MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_IOD:
cloud_logging_client.debug(
'DICOM SOPClassUID != MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_IOD proxying'
' delete request.',
{proxy_const.LogKeywords.SOP_CLASS_UID: dcm_metadata.SOPClassUID},
)
return dicom_store_util.dicom_store_proxy()
except AttributeError as exp:
msg = 'Error cannot determine DICOM instance SOPClassUID.'
cloud_logging_client.error(msg, base_log, exp)
return _text_response(msg, http.HTTPStatus.BAD_REQUEST)
# Get service account credientals to user to perfrom delete.
try:
service_account = _ServiceAccountCredentials(dicom_web_base_url)
except _UnableToAuthenticateUserError:
msg = _UNAUTHORIZED_CLIENT_ACCESS
cloud_logging_client.error(msg, base_log)
return _text_response(msg, http.HTTPStatus.UNAUTHORIZED)
# Test that annotation creator == user
try:
annotation_creator_email = _get_code_value(
dcm_metadata.OperatorIdentificationSequence[
0
].PersonIdentificationCodeSequence[0]
)
except (IndexError, AttributeError) as exp:
msg = (
'Error deleting DICOM annotation instance. Missing creator email in'
' DICOM instance.'
)
cloud_logging_client.error(msg, base_log, exp)
return _text_response(msg, http.HTTPStatus.BAD_REQUEST)
if _are_emails_different(
annotation_creator_email, service_account.user_email
):
msg = (
'Cannot delete annotation. User email does not match DICOM operator'
' identifier.'
)
cloud_logging_client.error(
msg,
base_log,
{
'operator_identifer': annotation_creator_email,
proxy_const.LogKeywords.USER_EMAIL: service_account.user_email,
},
)
return _text_response(msg, http.HTTPStatus.UNAUTHORIZED)
cloud_logging_client.info(
'Validated annotation creator email for annotation delete.',
base_log,
{proxy_const.LogKeywords.USER_EMAIL: annotation_creator_email},
)
# Delete annotation
return dicom_store_util.delete_instance_from_dicom_store(
service_account, dicom_instance_path
)
def _get_operator_identifier(dcm_file: pydicom.dataset.Dataset) -> List[str]:
"""Returns WSI Annotation operator indentifier SQ."""
try:
result = []
for op in dcm_file.OperatorIdentificationSequence:
for per_seq in op.PersonIdentificationCodeSequence:
result.append(_get_code_value(per_seq))
return result
except AttributeError:
return []
def _fix_incorrectly_formatted_filemeta_header(
dcm: pydicom.FileDataset,
) -> pydicom.FileDataset:
"""Fix incorrectly fromatted filemeta header."""
re_write_dicom_if_missing = [
'MediaStorageSOPClassUID',
'MediaStorageSOPInstanceUID',
'FileMetaInformationGroupLength',
'FileMetaInformationVersion',
'ImplementationClassUID',
]
if all([keyword in dcm.file_meta for keyword in re_write_dicom_if_missing]):
return dcm
with tempfile.TemporaryDirectory() as temp_dir:
path = os.path.join(temp_dir, 'temp.dcm')
# Force pydicom to add the above filemetadata headers to the DICOM if they
# are missing.
pydicom_version_util.save_as_validated_dicom(dcm, path)
return pydicom.dcmread(path)
def _upload_wsi_annotation(
service_account: _ServiceAccountCredentials,
dcm_file: pydicom.FileDataset,
store_instance_path: str,
headers: Mapping[str, str],
) -> flask.Response:
"""Upload part 10 DICOM to DICOM Store."""
dcm_file = _fix_incorrectly_formatted_filemeta_header(dcm_file)
user_email = service_account.user_email
operators = _get_operator_identifier(dcm_file)
if not operators:
cloud_logging_client.warning(
'DICOM operator identifier is empty adding current user.',
{proxy_const.LogKeywords.USER_EMAIL: user_email},
)
dcm_file.OperatorIdentificationSequence = [
_convert_creator_email_to_dicom_dataset(
user_email,
dicom_proxy_flags.DEFAULT_ANNOTATOR_INSTITUTION_FLG.value,
)
]
elif len(operators) > 1:
msg = (
'Failed to create DICOM Annotation. DICOM annotation describes multiple'
' operators.'
)
cloud_logging_client.error(msg, dcm_file.to_json_dict())
return _text_response(msg, http.HTTPStatus.BAD_REQUEST)
elif _are_emails_different(operators[0], user_email):
msg = (
'Failed to create DICOM Annotation. Operator identifier does not '
'match current user email.'
)
cloud_logging_client.error(
msg,
{
'operator_identifer': operators[0],
proxy_const.LogKeywords.USER_EMAIL: user_email,
},
)
return _text_response(msg, http.HTTPStatus.BAD_REQUEST)
cloud_logging_client.debug(
'Sending annotation create request to dicom store.',
dcm_file.to_json_dict(),
)
return dicom_store_util.upload_instance_to_dicom_store(
service_account, dcm_file, store_instance_path, headers
)
def _build_pydicom_dicom_from_request_json(
content: bytes,
) -> pydicom.FileDataset:
"""Converts DICOM store formatted json into PyDicom FileDataset.
Args:
content: bytes recieved in multipart DICOM json data.
Returns:
PyDicom FileDataset represented by JSON
Raises:
_InvalidDicomJsonError: Invalid DICOM JSON
"""
try:
all_tags = json.loads(content)
if isinstance(all_tags, list):
if len(all_tags) != 1:
cloud_logging_client.error(
'Invalid DICOM annotation JSON, unexpected number of parts.',
{proxy_const.LogKeywords.JSON: all_tags},
)
raise ValueError(f'Error found {len(all_tags)} instances in part.')
all_tags = all_tags[0]
if not isinstance(all_tags, dict):
cloud_logging_client.error(
'Invalid DICOM annotation JSON, metadata is not a dict.',
{proxy_const.LogKeywords.JSON: all_tags},
)
raise ValueError('Invalid formatted DICOM JSON.')
file_meta_tags = {}
dataset_tags = {}
for address, value in all_tags.items():
if address.startswith('0002'):
file_meta_tags[address] = value
else:
dataset_tags[address] = value
file_meta = pydicom.dataset.Dataset().from_json(json.dumps(file_meta_tags))
base_dataset = pydicom.Dataset().from_json(json.dumps(dataset_tags))
dcm = pydicom.dataset.FileDataset(
'',
base_dataset,
preamble=b'\0' * 128,
file_meta=pydicom.dataset.FileMetaDataset(file_meta),
)
cloud_logging_client.info('Generated DICOM annotation instance.')
return dcm
except Exception as exp:
raise _InvalidDicomJsonError(
f'Error decoding DICOM from JSON. JSON: {content}'
) from exp
def _get_status_code_from_multiple_responses(
responses: List[flask.Response],
) -> http.HTTPStatus:
"""Returns single status code for multiple DICOM instance store requests.
DICOM API (Table 6.6.1-1. HTTP/1.1 Standard Response Code):
https://dicom.nema.org/dicom/2013/output/chtml/part18/sect_6.6.html
Args:
responses: List of responses from multple store requests.
Returns:
Status code for all requests.
"""
status_codes = {response.status_code for response in responses}
# if all responses have the same status codes return that code.
if len(status_codes) == 1:
return status_codes.pop()
if any([code < 300 for code in status_codes]):
return http.HTTPStatus.ACCEPTED
return http.HTTPStatus.CONFLICT
def _generate_multipart_upload_response(
responses: List[flask.Response],
) -> flask.Response:
"""Combines list of DICOM insert responses into single response."""
if not responses:
return _text_response('Bad Request.', http.HTTPStatus.BAD_REQUEST)
if len(responses) == 1:
# if single response
return responses[0]
status_code = _get_status_code_from_multiple_responses(responses)
try:
dicom_xml = None
json_response = []
other_response = []
content_type = _TEXT_PLAIN
# Iteratate over each response
for response in responses:
# determine content type of resposne
content_type = (
flask_util.get_key_value(response.headers, _CONTENT_TYPE, _TEXT_PLAIN)
.lower()
.strip()
)
response_bytes = response.get_data(as_text=False)
cloud_logging_client.info(
'Generating multipart upload.',
{'response_bytes': response_bytes, 'content-type': {content_type}},
)
if content_type == _APPLICATION_DICOM_XML:
# If content type is DICOM XML Read XML and build XML response
if dicom_xml is None:
dicom_xml = ET.fromstring(response_bytes)
else:
temp = ET.fromstring(response_bytes)
for child in temp.findall('*'):
dicom_xml.append(child)
elif content_type == _APPLICATION_DICOM_JSON:
# If content type is DICOM JSO Read JSON and build JSON response
dicom = json.loads(response_bytes)
if isinstance(dicom, list):
json_response.extend(dicom)
else:
json_response.append(dicom)
else:
# If something else just create a list.
other_response.append(response_bytes)
response_bytes = b''
if dicom_xml is not None:
# If XML found generate XML string response
response_bytes = ET.tostring(dicom_xml)
if json_response:
# If JSON found generate JSON response
if len(json_response) == 1:
json_response = json_response[0]
json_response = json.dumps(json_response).encode('us-ascii')
# If XML and JSON found, combine and return as plain text.
if response_bytes:
content_type = _TEXT_PLAIN
response_bytes = b'\n\n'.join([response_bytes, json_response])
else:
response_bytes = json_response
if other_response:
# If Something unexpected found. Return as plain text and combine with
# pre-existing XML and JSON results.
content_type = _TEXT_PLAIN
other_response = b'\n\n'.join(other_response)
if response_bytes:
response_bytes = b'\n\n'.join([response_bytes, other_response])
else:
response_bytes = other_response
# Return combined response
return flask.Response(
response_bytes,
status=status_code,
content_type=content_type,
)
except (ET.ParseError, json.decoder.JSONDecodeError) as exp:
response_bytes = b'\n\n'.join(
[response.get_data(as_text=False) for response in responses]
)
cloud_logging_client.error(
'Error decoding responses.', {'responses': response_bytes}, exp
)
return _text_response(response_bytes, status_code)
def _get_pydicom_sopclassuid(dcm_bytes: IO[bytes]) -> str:
"""Returns SOPClassUID of part10 binary DICOM.
Args:
dcm_bytes: Part10 binary DICOM bytes.
Raises:
pydicom.errors.InvalidDicomError: Bytes are invalid.
"""
with pydicom.dcmread(
dcm_bytes, specific_tags=[_SOP_CLASS_UID_DICOM_TAG_ADDRESS]
) as dcm:
return dcm.SOPClassUID
def _get_sop_class_uid_of_part(
part_content_type: str, mp_data_content: bytes
) -> str:
"""Returns SOPClassUID of value encoded in multipart part.
Args:
part_content_type: Content type of part.
mp_data_content: Part data.
Returns:
SOPClassUID
Raises:
_MultiPartContentSopClassUidDecodingError: Cannot determine SOPClassUID.
"""
try:
if part_content_type == _APPLICATION_DICOM_JSON:
all_tags = json.loads(mp_data_content)
if isinstance(all_tags, list):
if len(all_tags) != 1:
raise ValueError(f'Error found {len(all_tags)} instances in part.')
all_tags = all_tags[0]
if not isinstance(all_tags, dict):
raise ValueError('Invalid formatted DICOM JSON.')
return all_tags[_SOP_CLASS_UID_DICOM_TAG_ADDRESS][_VALUE][0]
elif part_content_type == _APPLICATION_DICOM:
with io.BytesIO(mp_data_content) as dcm_bytes:
return _get_pydicom_sopclassuid(dcm_bytes)
# Unrecongized content type could be a variety of things
return _UNRECOGNIZED_CONTENT_TYPE
except Exception as exp:
raise _MultiPartContentSopClassUidDecodingError(
f'Error determining SOPClassUID. Content-Type: {part_content_type};'
f' Content: {mp_data_content}'
) from exp
def _multipart_encoder(parts: List[_MpRequestPart], boundary: bytes) -> bytes:
"""Converts Multipart parts back into Multipart request."""
with io.BytesIO() as mp_bytes:
crlf = b'\r\n'
marker = b'--'
start_of_part = b''.join([marker, boundary, crlf])
for part in parts:
mp_bytes.write(start_of_part)
for key, value in part.headers.items():
mp_bytes.write(key)
mp_bytes.write(b': ')
mp_bytes.write(value)
mp_bytes.write(crlf)
mp_bytes.write(crlf)
mp_bytes.write(part.content)
mp_bytes.write(crlf)
mp_bytes.write(marker)
mp_bytes.write(boundary)
mp_bytes.write(marker)
return mp_bytes.getvalue()
def store_instance(
dicom_web_base_url: dicom_url_util.DicomWebBaseURL,
study_uid: str = '',
) -> flask.Response:
"""Flask entry point for DICOMweb request to store an annotation instance.
Args:
dicom_web_base_url: Base DICOMweb URL for store.
study_uid: Optional Study instance uid that added instances required to
belong to.
Returns:
flask.Response
"""
store_instance_path = f'{dicom_web_base_url.full_url}/studies'
if study_uid:
store_instance_path = f'{store_instance_path}/{study_uid}'
base_log = {
proxy_const.LogKeywords.BASE_DICOMWEB_URL: str(dicom_web_base_url),
proxy_const.LogKeywords.STUDY_INSTANCE_UID: study_uid,
proxy_const.LogKeywords.DICOM_STORE_URL: dicom_web_base_url,
proxy_const.LogKeywords.DICOMWEB_URL: store_instance_path,
}
cloud_logging_client.info('Store instance annotation instance.', base_log)
if not _verify_allowed_dicom_annotations_store_address(dicom_web_base_url):
cloud_logging_client.debug(
f'The DICOM Store {dicom_web_base_url} in the request is not'
' allow-listed for annotations; proxying request.',
base_log,
)
# If not uploading request to annotation DICOM store proxy request.
return dicom_store_util.dicom_store_proxy()
# Get accept and content type headers
accept_value = flask_util.get_key_value(flask_util.get_headers(), _ACCEPT, '')
headers = {} if not accept_value else {_ACCEPT: accept_value}
original_ct = flask_util.get_key_value(
flask_util.get_headers(), _CONTENT_TYPE, ''
).strip()
# Break content type into parts ";" deliminate parts
content_type = original_ct.lower().replace(' ', '')
if not content_type:
# If content type is empty proxy request; will return error.
cloud_logging_client.debug(
'Missing content-type header; proxying annotation store request.',
base_log,
)
return dicom_store_util.dicom_store_proxy()
cloud_logging_client.info(
f'Store annotation content-type={content_type}', base_log
)
if content_type == _APPLICATION_DICOM:
# Handle upload of Part10 Binary DICOM instance.
# Simple binary upload.
with tempfile.TemporaryFile() as dicom_instance:
dicom_instance.write(flask_util.get_data())
dicom_instance.seek(0)
try:
# Load uploaded instance and test if instance is a
# MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_IOD
sopclass_uid = _get_pydicom_sopclassuid(dicom_instance)
if sopclass_uid != _MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_IOD:
# If not MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_IOD then Proxy the request
cloud_logging_client.debug(
'Annotation DICOM is not MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_IOD;'
' proxying annotation store request.',
base_log,
{proxy_const.LogKeywords.SOP_CLASS_UID: sopclass_uid},
)
return dicom_store_util.dicom_store_proxy()
# Binary DICOM is MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_IOD;
# Read DICOM Fully
dicom_instance.seek(0)
with pydicom.dcmread(dicom_instance) as dcm:
# Get proxy service account credientals.
try:
service_account = _ServiceAccountCredentials(dicom_web_base_url)
except _UnableToAuthenticateUserError as exp:
cloud_logging_client.error(
'Unable to authenticate user cannot save annotation.',
base_log,
exp,
)
return _text_response(
_UNAUTHORIZED_CLIENT_ACCESS,
http.HTTPStatus.UNAUTHORIZED,
)
# Upload instance to DICOM store using service account credientals.
return _upload_wsi_annotation(
service_account, dcm, store_instance_path, headers
)
except pydicom.errors.InvalidDicomError as exp:
cloud_logging_client.error('Invalid DICOM instance.', base_log, exp)
return _text_response(
'Invalid DICOM instance.',
http.HTTPStatus.BAD_REQUEST,
)
elif content_type.startswith(f'{_MULTIPART_RELATED};'):
# Hand upload of multipart/related data to DICOM Store.
# Decoded recieved multi-part response.
try:
mp_response = requests_toolbelt.MultipartDecoder(
flask_util.get_data(), original_ct
)
except (
requests_toolbelt.NonMultipartContentTypeException,
requests_toolbelt.ImproperBodyPartContentException,
) as exp:
cloud_logging_client.error('Invalid multipart request.', base_log, exp)
return _text_response(
'Invalid multipart request.', http.HTTPStatus.BAD_REQUEST
)
# Determine content type and boundary of multipart/related.
service_account = None
non_annotation_parts = []
boundary = mp_response.boundary
upload_results = []
# Process each part of the multipart response individually.
# Parts describing annotations uploaded using service account credientals.
# All other uploaded using user credientals.
for mp_data in mp_response.parts:
# Get content type of part.
part_content_type = flask_util.get_key_value(
{
key.decode('utf-8'): value.decode('utf-8')
for key, value in mp_data.headers.items()
},
_CONTENT_TYPE,
'',
)
if not part_content_type:
msg = 'Invalid multipart part. Missing content-type.'
cloud_logging_client.error(msg, base_log)
return _text_response(msg, http.HTTPStatus.BAD_REQUEST)
part_content_type = (
part_content_type.split(';')[0].replace(' ', '').strip('"').lower()
)
# Determine SOPClassUID of part.
try:
sopclass_uid = _get_sop_class_uid_of_part(
part_content_type, mp_data.content
)
except _MultiPartContentSopClassUidDecodingError as exp:
cloud_logging_client.error(
'Cannot decode sopclass uid.',
{proxy_const.LogKeywords.MULTIPART_CONTENT: mp_data.text},
base_log,
exp,
)
return _text_response(
'Invalid multipart request.', http.HTTPStatus.BAD_REQUEST
)
# If multipart content is not a annotation IOD then upload the
# part as a part of multipart/related request that includes everything
# but wsi annotations.
if sopclass_uid != _MICROSCOPY_BULK_SIMPLE_ANNOTATIONS_IOD:
non_annotation_parts.append(
_MpRequestPart(mp_data.headers, mp_data.content)
)
continue
# Part describes an WSI Annotation DICOM instance.
# Read instance, could be inline JSON or inline Part10 Binary DICOM.
try:
if part_content_type == _APPLICATION_DICOM_JSON:
dcm = _build_pydicom_dicom_from_request_json(mp_data.content)
elif part_content_type == _APPLICATION_DICOM:
with io.BytesIO(mp_data.content) as dcm_bytes:
dcm = pydicom.dcmread(dcm_bytes)
else:
raise ValueError(
f'Invalid multipart content-type {part_content_type}.'
)
except (
ValueError,
_InvalidDicomJsonError,
pydicom.errors.InvalidDicomError,
) as exp:
cloud_logging_client.error(
'Invalid DICOM instance in multipart request.',
{proxy_const.LogKeywords.MULTIPART_CONTENT: mp_data.content},
exp,
base_log,
)
return _text_response(
'Invalid DICOM instance in multipart request.',
http.HTTPStatus.BAD_REQUEST,
)
# Get service account credientials if not initalized.
if service_account is None:
try:
service_account = _ServiceAccountCredentials(dicom_web_base_url)
except _UnableToAuthenticateUserError as exp:
cloud_logging_client.error(_UNAUTHORIZED_CLIENT_ACCESS, base_log, exp)
return _text_response(
_UNAUTHORIZED_CLIENT_ACCESS, status=http.HTTPStatus.UNAUTHORIZED
)
# Upload WSI annotations using service account credientals.
upload_wsi_annotation = _upload_wsi_annotation(
service_account, dcm, store_instance_path, headers
)
upload_results.append(upload_wsi_annotation)
# If non-annotation parts were included in multipart/releated request
# Re-assemble multipart related request without annotations and upload
# to store using user credientals.
if non_annotation_parts:
multipart_data = _multipart_encoder(non_annotation_parts, boundary)
user_auth = user_auth_util.AuthSession(flask_util.get_headers())
multipart_result = dicom_store_util.upload_multipart_to_dicom_store(
user_auth,
multipart_data,
original_ct,
store_instance_path,
headers,
)
upload_results.append(multipart_result)
# Combine responses from WSI Annotation uploads and upload of multipart
# response into a single response and return combined results.
return _generate_multipart_upload_response(upload_results)
else:
# Content type describes something else proxy request across DICOM store
# and return response.
cloud_logging_client.debug(
'Unrecognized content-type; proxying annotation store request.',
base_log,
)
return dicom_store_util.dicom_store_proxy()
def _patch_metadata(
old_md: MutableMapping[str, Any], new_md: Mapping[str, Any]
) -> MutableMapping[str, Any]:
"""Patch previously retrieved metadata with whole instance metadata.
Temporary patch for bug in DICOM store. Remove/Disable when store supports
returning metadata for short binary tags.
Args:
old_md: Metadata downloaded from the store in metadata transaction.
new_md: Metadata retrieved store by downloading the instance.
Returns:
Metadata downloaded from the store in metadata transaction supplemented with
metadata that was missing and drescribed in the metadata retrieved by
downloading the instace.
"""
for key, new_md_datset in new_md.items():
if key not in old_md:
old_md[key] = new_md_datset
cloud_logging_client.debug(f'Patching metadata key: {key}')
else:
old_md_dataset = old_md[key]
try:
if old_md_dataset[_VR] != _SQ or new_md_datset[_VR] != _SQ:
continue
old_md_value = old_md_dataset[_VALUE]
new_md_value = new_md_datset[_VALUE]
except KeyError:
continue
if len(old_md_value) != len(new_md_value):
cloud_logging_client.error(
'Metadata SQ lengths are not identical leaving sq unchanged.',
{
'old_metadata_sq_len': len(old_md_value),
'new_metadata_sq_len': len(new_md_value),
},
)
continue
for index, dset in enumerate(new_md_value):
old_md_value[index] = _patch_metadata(old_md_value[index], dset)
return old_md
def _get_annotation_metadata(
user_auth: user_auth_util.AuthSession,
dicom_web_base_url: dicom_url_util.DicomWebBaseURL,
uid: metadata_util.MetadataUID,
old_metadata: Optional[MutableMapping[str, Any]],
manager: dicom_store_util.MetadataThreadPoolDownloadManager,
) -> str:
"""Download and return annotation metadata.
Args:
user_auth: User authentication used to connect to store.
dicom_web_base_url: Base DICOMweb URL for store.
uid: DICOM Study, Series, and Instance UID.
old_metadata: Optional DICOM instance json formatted metadata to merge
response with.
manager: Dicom instance thread pool manager.
Returns:
Metadata as JSON formatted string.
Raises:
dicom_store_util.DicomInstanceMetadataRetrievalError: Unable to retrieve
metadata for DICOM instance from the store.
"""
instance_metadata = dicom_store_util.download_instance_return_metadata(
user_auth,
dicom_web_base_url,
uid.study_instance_uid,
uid.series_instance_uid,
uid.sop_instance_uid,
['PixelData'],
)
if old_metadata is not None:
instance_metadata = _patch_metadata(old_metadata, instance_metadata)
instance_metadata = json.dumps(instance_metadata, sort_keys=True)
manager.inc_data_downloaded(len(instance_metadata))
return instance_metadata
def download_return_annotation_metadata(
dicom_web_base_url: dicom_url_util.DicomWebBaseURL,
study_uid: dicom_url_util.StudyInstanceUID,
series_uid: dicom_url_util.SeriesInstanceUID,
instance_uid: dicom_url_util.SOPInstanceUID,
metadata: MutableMapping[str, Any],
patch_missing_metadata_tags: bool,
manager: dicom_store_util.MetadataThreadPoolDownloadManager,
) -> Union[futures.Future[str], str]:
"""Tests if instance describes WSI annotation and returns instance metadata.
Args:
dicom_web_base_url: Base DICOMweb URL for store.
study_uid: StudyInstanceUID for request or if empty init from metadata.
series_uid: SeriesInstanceUID for request or if empty init from metadata.
instance_uid: SOPInstanceUID for request or if empty init from metadata.
metadata: DICOM instance json formatted metadata,
patch_missing_metadata_tags: Metadata for missing tags; short term fix for
DICOM store bug which results in store omitting short binary tags from
bulk uri response.
manager: Dicom instance thread pool manager.
Returns:
Metadata as JSON formatted string.
Raises:
dicom_store_util.DicomInstanceMetadataRetrievalError: Unable to retrieve
metadata for DICOM instance from the store.
"""
uid = metadata_util.get_metadata_uid(
study_uid, series_uid, instance_uid, metadata
)
if not uid.is_defined():
return json.dumps(metadata, sort_keys=True)
cloud_logging_client.debug(
'Returning metadata for bulk microscopy simple annotation'
)
user_auth = user_auth_util.AuthSession(flask_util.get_headers())
return manager.submit(
_get_annotation_metadata,
user_auth,
dicom_web_base_url,
uid,
metadata if patch_missing_metadata_tags else None,
manager,
)