pathology/dicom_proxy/dicom_store_util.py (544 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utility functions to interact with DICOM Store."""
from __future__ import annotations
from concurrent import futures
import contextlib
import http
import io
import re
import tempfile
import threading
import typing
from typing import Any, Callable, IO, Iterator, List, Mapping, Optional, Sequence, Union
import flask
import pydicom
import requests
from pathology.dicom_proxy import base_dicom_request_error
from pathology.dicom_proxy import dicom_proxy_flags
from pathology.dicom_proxy import dicom_url_util
from pathology.dicom_proxy import flask_util
from pathology.dicom_proxy import logging_util
from pathology.dicom_proxy import proxy_const
from pathology.dicom_proxy import user_auth_util
from pathology.shared_libs.logging_lib import cloud_logging_client
AuthSession = user_auth_util.AuthSession
_UNSUPPORTED_PROXY_PARAMS = frozenset([
'disable_caching',
'downsample',
'embed_iccprofile',
'iccprofile',
'interpolation',
'quality',
])
_PARSE_URL = re.compile(r'(http.*)/?\?(.*)')
class DicomInstanceMetadataRetrievalError(Exception):
pass
class MetadataDownloadExceedsMaxSizeLimitError(Exception):
pass
class DicomInstanceRequestError(base_dicom_request_error.BaseDicomRequestError):
"""Exception which wraps error responses from DICOM store."""
def __init__(self, response: requests.Response, msg: Optional[str] = None):
super().__init__('DicomInstanceRequestError', response, msg)
class DicomMetadataRequestError(base_dicom_request_error.BaseDicomRequestError):
"""Exception which wraps error responses from DICOM store."""
def __init__(self, response: requests.Response, msg: Optional[str] = None):
super().__init__('DicomMetadataRequestError', response, msg)
class MetadataThreadPoolDownloadManagerError(Exception):
pass
class MetadataThreadPoolDownloadManager(contextlib.ExitStack):
"""Manages async download dicom metadata."""
def __init__(self, num_threads: int, max_data_size: int):
super().__init__()
self._max_threads = min(
num_threads,
dicom_proxy_flags.MAX_AUGMENTED_METADATA_DOWNLOAD_THREAD_COUNT_FLG.value,
)
self._pool = None
self._total_size = 0
self._lock = threading.Lock()
self._max_data_size = max_data_size
self._future_list = []
self._context_entered = False
self.callback(self._close_context)
def _close_context(self) -> None:
if self._future_list:
futures.wait(self._future_list, return_when=futures.FIRST_EXCEPTION)
self._future_list.clear()
self._context_entered = False
def __enter__(self):
self._context_entered = True
return super().__enter__()
def _get_thread_pool(self) -> futures.ThreadPoolExecutor:
if self._pool is not None:
return self._pool
self._pool = self.enter_context(
futures.ThreadPoolExecutor(max_workers=self._max_threads)
)
return self._pool
def inc_data_downloaded(self, size: int) -> int:
if not self._context_entered:
raise MetadataThreadPoolDownloadManagerError('Not in context block.')
with self._lock:
self._total_size += size
if self._total_size > self._max_data_size:
raise MetadataDownloadExceedsMaxSizeLimitError()
return self._total_size
def map(
self, func: Callable[..., Any], args: Union[Sequence[Any], Iterator[Any]]
) -> Sequence[Union[futures.Future[str], str]]:
return [self.submit(func, arg) for arg in args]
def submit(
self, func: Callable[..., Any], *args
) -> Union[futures.Future[str], str]:
if not self._context_entered:
raise MetadataThreadPoolDownloadManagerError('Not in context block.')
if self._max_threads < 2:
return func(*args)
else:
future_metadata = self._get_thread_pool().submit(func, *args)
self._future_list.append(future_metadata)
return future_metadata
def _download_bytes(
query: dicom_url_util.DicomStoreTransaction,
base_log: Mapping[str, Any],
output: Union[str, IO[bytes]],
) -> int:
"""Downloads binary data from DICOM to file or file like object.
Args:
query: DICOM web query to perform.
base_log: Structured logging to include.
output: Path to write downloaded DICOM file or File Like Object.
Returns:
Data size in bytes downloaded.
Raises:
DicomInstanceRequestError: Error downloading DICOM instance.
"""
bytes_downloaded = 0
with requests.Session() as session:
response = session.get(query.url, headers=query.headers, stream=True)
try:
response.raise_for_status()
chunk_size = (
dicom_proxy_flags.DICOM_INSTANCE_DOWNLOAD_STREAMING_CHUNKSIZE_FLG.value
)
if isinstance(output, str):
with open(output, 'wb') as outfile:
for chunk in response.iter_content(chunk_size=chunk_size):
response.raise_for_status()
outfile.write(chunk)
bytes_downloaded += len(chunk)
else:
for chunk in response.iter_content(chunk_size=chunk_size):
response.raise_for_status()
output.write(chunk)
bytes_downloaded += len(chunk)
output.seek(0)
cloud_logging_client.info('Downloaded DICOM instance.', base_log)
return bytes_downloaded
except requests.exceptions.HTTPError as exp:
cloud_logging_client.error(
'Error occured downloading DICOM instance.', base_log, exp
)
raise DicomInstanceRequestError(response) from exp
def download_bulkdata(
user_auth: user_auth_util.AuthSession,
bulk_data_uri: str,
output: Union[str, IO[bytes]],
) -> int:
"""Downloads DICOM bulkdata from bulkdata uri.
Args:
user_auth: User authentication to use to download instance.
bulk_data_uri: Bulk data uri to download data.
output: Path to write downloaded data or file like object.
Returns:
Data size in bytes downloaded.
Raises:
DicomInstanceRequestError: Error downloading DICOM instance.
"""
query = dicom_url_util.download_bulkdata(user_auth, bulk_data_uri)
base_log = {
proxy_const.LogKeywords.DICOMWEB_URL: query.url,
proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: user_auth.email,
}
return _download_bytes(query, base_log, output)
def download_dicom_instance(
user_auth: user_auth_util.AuthSession,
dicom_series_url: dicom_url_util.DicomSeriesUrl,
found_instance: dicom_url_util.SOPInstanceUID,
output: Union[str, IO[bytes]],
) -> int:
"""Downloads DICOM instance from store to container.
Args:
user_auth: User authentication to use to download instance.
dicom_series_url: DICOM web url to series containing instance.
found_instance: SOP Instance UID to download.
output: Path to write downloaded DICOM instance or file like object.
Returns:
data size in bytes downloaded
Raises:
DicomInstanceRequestError: Error downloading DICOM instance.
"""
url = dicom_url_util.series_dicom_instance_url(
dicom_series_url, found_instance
)
query = dicom_url_util.download_dicom_instance_not_transcoded(user_auth, url)
base_log = {
proxy_const.LogKeywords.DICOMWEB_URL: query.url,
proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: user_auth.email,
}
return _download_bytes(query, base_log, output)
def download_instance_return_metadata(
user_auth: user_auth_util.AuthSession,
dicom_web_base_url: dicom_url_util.DicomWebBaseURL,
study_instance_uid: str,
series_instance_uid: str,
sop_instance_uid: str,
exclude_tags: Optional[List[str]] = None,
) -> Mapping[str, Any]:
"""Flask entry point for request to retrieve an instance annotation.
Args:
user_auth: User authentication to use to download instance.
dicom_web_base_url: Base DICOMweb URL for store.
study_instance_uid: DICOM study instance UID.
series_instance_uid: DICOM series instance UID.
sop_instance_uid: DICOM sop instance UID.
exclude_tags: Tags to excludes from response
Returns:
JSON Metadata
Raises:
DicomInstanceMetadataRetrievalError: Unable to download or unable to read
DICOM.
"""
series_url = dicom_url_util.base_dicom_series_url(
dicom_web_base_url,
dicom_url_util.StudyInstanceUID(study_instance_uid),
dicom_url_util.SeriesInstanceUID(series_instance_uid),
)
exclude_tags = exclude_tags if exclude_tags is not None else []
with tempfile.TemporaryFile() as dicom_instance:
try:
download_dicom_instance(
user_auth,
series_url,
dicom_url_util.SOPInstanceUID(sop_instance_uid),
dicom_instance,
)
except DicomInstanceRequestError as exp:
raise DicomInstanceMetadataRetrievalError( # pylint: disable=bad-exception-cause
'Error downloading DICOM instance'
) from exp
try:
with pydicom.dcmread(dicom_instance) as dcm:
for tag in exclude_tags:
try:
del dcm[tag]
except (KeyError, ValueError, TypeError) as _:
pass
base_dict = dcm.to_json_dict()
base_dict.update(dcm.file_meta.to_json_dict())
return base_dict
except pydicom.errors.InvalidDicomError as exp:
raise DicomInstanceMetadataRetrievalError(
'Error decoding DICOM instance'
) from exp
def _get_dicom_json(
query: dicom_url_util.DicomStoreTransaction,
base_log: Mapping[str, Any],
) -> List[Mapping[str, Any]]:
"""Returns DICOM json metadata.
Args:
query: DICOMweb query to perfrom.
base_log: Structured logging to include with request logs.
Returns:
List of DICOM instance metadata (JSON) stored in series.
Raises:
DicomMetadataRequestError: HTTP error or error decoding dicomWeb response.
"""
response = requests.get(query.url, headers=query.headers, stream=False)
try:
response.raise_for_status()
metadata = response.json()
cloud_logging_client.info(
f'Retrieved DICOM JSON: {query.url}',
base_log,
{proxy_const.LogKeywords.DICOMWEB_URL: query.url},
)
return metadata
except requests.exceptions.HTTPError as exp:
cloud_logging_client.error(
'Error occured downloading DICOM instance metadata.',
base_log,
exp,
{
proxy_const.LogKeywords.DICOMWEB_RESPONSE: response.text,
proxy_const.LogKeywords.DICOMWEB_URL: query.url,
},
)
raise DicomMetadataRequestError(response) from exp
except requests.exceptions.JSONDecodeError as exp:
cloud_logging_client.error(
'Error occured decoding DICOM instance metadata JSON.',
base_log,
exp,
{
proxy_const.LogKeywords.DICOMWEB_RESPONSE: response.text,
proxy_const.LogKeywords.DICOMWEB_URL: query.url,
},
)
raise DicomMetadataRequestError(response, 'invalid JSON response.') from exp
def get_series_instance_tags(
user_auth: user_auth_util.AuthSession,
dicom_series_url: dicom_url_util.DicomSeriesUrl,
additional_tags: Optional[List[str]] = None,
) -> List[Mapping[str, Any]]:
"""Returns instance tag metadata for all instances in a series.
Args:
user_auth: User authentication to use to download instance.
dicom_series_url: DICOM web url to series containing instance.
additional_tags: Additional DICOM tags(keywords) to request metadata for.
Returns:
List of DICOM instance metadata (JSON) stored in series.
Raises:
DicomMetadataRequestError: HTTP error or error decoding dicomWeb response.
"""
return _get_dicom_json(
dicom_url_util.dicom_instance_tag_query(
user_auth, dicom_series_url, None, additional_tags
),
{proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: user_auth.email},
)
def get_instance_tags(
user_auth: user_auth_util.AuthSession,
dicom_series_url: dicom_url_util.DicomSeriesUrl,
instance: dicom_url_util.SOPInstanceUID,
additional_tags: Optional[List[str]] = None,
) -> List[Mapping[str, Any]]:
"""Returns instance tag metadata.
Args:
user_auth: User authentication to use to download instance.
dicom_series_url: DICOM web url to series containing instance.
instance: DICOM instance in series to return metadata for.
additional_tags: Additional DICOM tags(keywords) to request metadata for.
Returns:
DICOM instance metadata (JSON) [does not include bulkdatauri].
Raises:
DicomMetadataRequestError: HTTP error or error decoding dicomWeb response.
"""
return _get_dicom_json(
dicom_url_util.dicom_instance_tag_query(
user_auth, dicom_series_url, instance, additional_tags
),
{proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: user_auth.email},
)
def get_dicom_instance_metadata(
user_auth: user_auth_util.AuthSession,
dicom_series_url: dicom_url_util.DicomSeriesUrl,
instance: dicom_url_util.SOPInstanceUID,
) -> List[Mapping[str, Any]]:
"""Returns instance level metadata.
Args:
user_auth: User authentication to use to download instance.
dicom_series_url: DICOM web url to series containing instance.
instance: DICOM instance in series to return metadata for.
Returns:
DICOM instance metadata (JSON) [includes bulkdatauri].
Raises:
DicomMetadataRequestError: HTTP error or error decoding dicomWeb response.
"""
return _get_dicom_json(
dicom_url_util.dicom_instance_metadata_query(
user_auth, dicom_series_url, instance
),
{proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: user_auth.email},
)
def get_dicom_study_instance_metadata(
user_auth: user_auth_util.AuthSession,
dicom_study_url: dicom_url_util.DicomStudyUrl,
instance: dicom_url_util.SOPInstanceUID,
) -> List[Mapping[str, Any]]:
"""Returns instance level metadata.
Args:
user_auth: User authentication to use to download instance.
dicom_study_url: DICOM web url for study containing instance.
instance: DICOM instance in series to return metadata for.
Returns:
DICOM instance metadata (JSON) [includes bulkdatauri].
Raises:
DicomMetadataRequestError: HTTP error or error decoding dicomWeb response.
"""
return _get_dicom_json(
dicom_url_util.dicom_instance_tag_query(
user_auth, dicom_study_url, instance
),
{proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: user_auth.email},
)
def get_dicom_study_series_metadata(
user_auth: user_auth_util.AuthSession,
dicom_web_base_url: dicom_url_util.DicomWebBaseURL,
study_instance_uid: dicom_url_util.StudyInstanceUID,
) -> Sequence[Mapping[str, Any]]:
"""Returns study serieslevel metadata."""
return _get_dicom_json(
dicom_url_util.dicom_get_study_series_metadata_query(
user_auth, dicom_web_base_url, study_instance_uid
),
{proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: user_auth.email},
)
def get_dicom_study_metrics(
user_auth: user_auth_util.AuthSession,
dicom_web_base_url: dicom_url_util.DicomWebBaseURL,
study_instance_uid: dicom_url_util.StudyInstanceUID,
) -> Mapping[str, Any]:
"""Returns dicom study level metrics."""
return typing.cast(
Mapping[str, Any],
_get_dicom_json(
dicom_url_util.dicom_get_study_metrics_query(
user_auth, dicom_web_base_url, study_instance_uid
),
{
proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: (
user_auth.email
)
},
),
)
def get_dicom_series_metrics(
user_auth: user_auth_util.AuthSession,
dicom_web_base_url: dicom_url_util.DicomWebBaseURL,
study_instance_uid: dicom_url_util.StudyInstanceUID,
series_instance_uid: dicom_url_util.SeriesInstanceUID,
) -> Mapping[str, Any]:
"""Returns dicom study level metrics."""
return typing.cast(
Mapping[str, Any],
_get_dicom_json(
dicom_url_util.dicom_get_series_metrics_query(
user_auth,
dicom_web_base_url,
study_instance_uid,
series_instance_uid,
),
{
proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: (
user_auth.email
)
},
),
)
def delete_instance_from_dicom_store(
user_auth: user_auth_util.AuthSession, dicom_path: str
) -> flask.Response:
"""Deletes instance from dicom store within a session.
Args:
user_auth: User_auth used to delete instance to DICOM Store
dicom_path: DICOMweb instance path of instance to delete.
Returns:
Delete Response
"""
base_log = {
proxy_const.LogKeywords.DICOMWEB_URL: dicom_path,
proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: user_auth.email,
}
try:
headers = user_auth.add_to_header(
{'Content-Type': 'application/dicom+json; charset=utf-8'}
)
response = requests.delete(dicom_path, headers=headers)
response.raise_for_status()
cloud_logging_client.info(
f'Deleted instance {dicom_path} from DICOM Store.', base_log
)
except requests.HTTPError as exc:
cloud_logging_client.error(
f'Failed to delete instance {dicom_path}.', base_log, exc
)
response = exc.response
return flask.Response(
response.text,
status=response.status_code,
content_type=flask_util.get_key_value(
response.headers, 'content-type', ''
),
)
def upload_instance_to_dicom_store(
user_auth: user_auth_util.AuthSession,
dataset: pydicom.dataset.Dataset,
dicom_path: str,
headers: Optional[Mapping[str, str]] = None,
) -> flask.Response:
"""Adds headers and uploads to dicom store within a session.
Args:
user_auth: User auth used to upload instance to DICOM Store
dataset: DICOM dataset to upload.
dicom_path: DICOMweb study path to upload to.
headers: Optional headers to include with request.
Returns:
POST Response.
"""
if headers is None:
headers = {}
flask_util.add_key_value_to_dict(headers, 'Content-Type', 'application/dicom')
headers = user_auth.add_to_header(headers)
with io.BytesIO() as f:
dataset.save_as(f)
# Get value of buffer as bytearray independent from the ByteIO buffer.
data = f.getvalue()
base_log = {
proxy_const.LogKeywords.DICOMWEB_URL: dicom_path,
proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: user_auth.email,
}
try:
response = requests.post(dicom_path, data=data, headers=headers)
response.raise_for_status()
cloud_logging_client.info(
'Uploaded DICOM instance to DICOM store.', base_log
)
return flask.Response(
response.text,
status=response.status_code,
content_type=flask_util.get_key_value(
response.headers, 'content-type', ''
),
)
except requests.HTTPError as exp:
if exp.response.status_code == http.HTTPStatus.CONFLICT:
cloud_logging_client.warning(
(
'Cannot create DICOM Annotation. UID triple (StudyInstanceUID, '
'SeriesInstanceUID, SOPInstanceUID) already exists in store.'
),
base_log,
exp,
)
else:
cloud_logging_client.warning(
'Error occurred when trying to create the annotation.', base_log, exp
)
return flask.Response(
exp.response.text,
status=exp.response.status_code,
content_type=flask_util.get_key_value(
exp.response.headers, 'content-type', ''
),
)
def upload_multipart_to_dicom_store(
user_auth: user_auth_util.AuthSession,
multipart_data: bytes,
content_type: str,
dicom_path: str,
headers: Optional[Mapping[str, str]] = None,
) -> flask.Response:
"""Uploads multipart dicom to dicom store within a session.
Args:
user_auth: User auth used to upload instance to DICOM Store
multipart_data: Multipart data to upload.
content_type: Multipart content type.
dicom_path: DICOMweb study path to upload to.
headers: Optional headers to include with request.
Returns:
POST Response.
"""
if headers is None:
headers = {}
flask_util.add_key_value_to_dict(headers, 'Content-Type', content_type)
headers = user_auth.add_to_header(headers)
base_log = {
proxy_const.LogKeywords.DICOMWEB_URL: dicom_path,
proxy_const.LogKeywords.USER_EMAIL_OF_DICOMWEB_REQUEST: user_auth.email,
}
try:
response = requests.post(dicom_path, data=multipart_data, headers=headers)
response.raise_for_status()
cloud_logging_client.warning(
'Uploaded multipart data to DICOM store.', base_log
)
return flask.Response(
response.text,
status=response.status_code,
content_type=flask_util.get_key_value(
response.headers, 'content-type', ''
),
)
except requests.HTTPError as exp:
if exp.response.status_code == http.HTTPStatus.CONFLICT:
cloud_logging_client.warning(
(
'Cannot create DICOM Annotation. UID triple (StudyInstanceUID, '
'SeriesInstanceUID, SOPInstanceUID) already exists in store.'
),
exp,
)
else:
cloud_logging_client.warning(
'Error occurred when trying to create the annotation.',
exp,
)
return flask.Response(
exp.response.text,
status=exp.response.status_code,
content_type=flask_util.get_key_value(
exp.response.headers, 'content-type', ''
),
)
def _remove_dicom_proxy_url_path_prefix(url_path: str) -> str:
"""Removes tileserver URL prefix from url path."""
if url_path.startswith(dicom_proxy_flags.PROXY_SERVER_URL_PATH_PREFIX):
return url_path[len(dicom_proxy_flags.PROXY_SERVER_URL_PATH_PREFIX) :]
return url_path
def _get_flask_full_path_removing_unsupported_proxy_params() -> str:
"""Returns flask path removing unsupported proxy query parameters."""
base_url = flask_util.get_path()
flask_args = flask_util.get_key_args_list()
if not flask_args:
return base_url
returned_params = []
for key, value_list in flask_args.items():
if key.lower() in _UNSUPPORTED_PROXY_PARAMS:
continue
returned_params.extend([f'{key}={value}' for value in value_list])
if not returned_params:
return base_url
returned_params = '&'.join(returned_params)
return f'{base_url}?{returned_params}'
def _stream_proxy_content(response: requests.Response) -> Iterator[bytes]:
"""Generator streams bytes from proxied DICOMweb request."""
for content in response.iter_content(
chunk_size=dicom_proxy_flags.DICOM_INSTANCE_DOWNLOAD_STREAMING_CHUNKSIZE_FLG.value
):
yield content
return None
def _add_parameters_to_url(url: str, params_to_add: List[str]) -> str:
"""Returns URL with parameters added to it."""
if url[-1] == '?':
url = url[:-1]
params = ''
else:
match = _PARSE_URL.fullmatch(url)
if not match:
params = ''
else:
url, params = match.groups()
url = url.rstrip('/')
params_to_add = '&'.join(params_to_add)
if params:
params = f'{params}&{params_to_add}'
else:
params = params_to_add
return f'{url}?{params}'
def dicom_store_proxy(params: Optional[List[str]] = None) -> flask.Response:
"""Processes generic DICOM Store proxy request and return results.
Args:
params: Optional parameters to add to proxy request.
Returns:
flask.Response
"""
method = flask_util.get_method()
# Convert proxy web request url into url against DICOM store.
full_path = _get_flask_full_path_removing_unsupported_proxy_params()
cloud_logging_client.info(f'Entry Proxy request: {full_path}')
if params is not None and params:
full_path = _add_parameters_to_url(full_path, params)
url = _remove_dicom_proxy_url_path_prefix(full_path)
url = dicom_url_util.get_dicom_store_url(url)
# The accept(key):value pair in the HTTP request defines the image/data
# format of what the client is requesting. Copy the header key:value from
# the request and use it in the call to the DICOM store.
header = flask_util.norm_dict_keys(
flask_util.get_headers(),
[
flask_util.ACCEPT_HEADER_KEY,
flask_util.CONTENT_TYPE_HEADER_KEY,
flask_util.CONTENT_LENGTH_HEADER_KEY,
flask_util.ACCEPT_ENCODING_KEY,
],
)
headers = AuthSession(flask_util.get_headers()).add_to_header(header)
cloud_logging_client.info(
f'Proxied URL: {url}',
{
'proxy_headers': logging_util.mask_privileged_header_values(headers),
'proxy_method': method,
},
)
# Proxy using same method called with
# Stream data from proxy to caller to avoid fully loading response in server
if method == flask_util.GET:
response = requests.get(url, headers=headers, stream=True)
elif method == flask_util.POST:
response = requests.post(
url, headers=headers, stream=True, data=flask_util.get_data()
)
elif method == flask_util.DELETE:
response = requests.delete(url, headers=headers, stream=True)
else:
msg = 'Unsupported HTTP request method.'
cloud_logging_client.error(msg, {'method': method, 'url': url})
return flask.Response(
response=str(msg),
status=http.HTTPStatus.BAD_REQUEST.value,
content_type='text/plain',
)
fl_response = flask.Response(
_stream_proxy_content(response),
status=response.status_code,
direct_passthrough=False,
)
fl_response.headers.clear()
for key, value in response.headers.items():
fl_response.headers[key] = value
return fl_response