pathology/dicom_proxy/user_auth_util.py (161 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utility for user auth and session creation."""
import os
import threading
from typing import Any, Mapping, MutableMapping, Optional
import cachetools
import google.auth
import google.auth.transport.requests
import requests
from pathology.dicom_proxy import dicom_proxy_flags
from pathology.dicom_proxy import flask_util
from pathology.dicom_proxy import proxy_const
from pathology.dicom_proxy import redis_cache
from pathology.shared_libs.logging_lib import cloud_logging_client
_USER_INFO_REQUEST_URL = 'https://www.googleapis.com/userinfo/v2/me'
_EMAIL_DERIVED_FROM_BEARER_TOKEN = 'EMAIL_DERIVED_FROM_BEARER_TOKEN'
class UserEmailRetrievalError(Exception):
"""Unable to determine user email."""
_auth_cache = cachetools.LRUCache(maxsize=1)
_auth_cache_lock = threading.Lock()
def _init_fork_module_state() -> None:
global _auth_cache
global _auth_cache_lock
_auth_cache = cachetools.LRUCache(maxsize=1)
_auth_cache_lock = threading.Lock()
def _add_key_if_defined_in_source(
dest: MutableMapping[str, Any], source: Mapping[str, Any], key: str
) -> None:
value = source.get(key.lower())
if value is not None:
dest[key] = value
@cachetools.cached(_auth_cache, lock=_auth_cache_lock)
def _get_email_from_bearer_token(bearer_token: str) -> str:
"""Return Email address associated with OAuth bearer token.
Args:
bearer_token: OAuth Bearer Token.
Returns:
Email address.
Raises:
UserEmailRetrievalError: Unable to resolve email address.
"""
cache = redis_cache.RedisCache()
token_key = f'email:{bearer_token}'
email = cache.get(token_key)
if email is not None and email.value is not None:
return email.value.decode('utf-8')
response = None
try:
response = requests.get(
_USER_INFO_REQUEST_URL,
headers={
proxy_const.HeaderKeywords.AUTH_HEADER_KEY: bearer_token,
},
)
json_response = response.json()
email = json_response['email']
if not json_response['verified_email']:
msg = f'Email: {email} is not verified.'
cloud_logging_client.error(msg, {'e-mail': email})
raise UserEmailRetrievalError(msg)
cache.set(
token_key,
email,
ttl_sec=dicom_proxy_flags.USER_LEVEL_METADATA_TTL_FLG.value,
)
return email
except (
requests.exceptions.RequestException,
requests.exceptions.JSONDecodeError,
KeyError,
) as exc:
cloud_logging_client.error(
'Unable to retrieve user email.',
exc,
{'response_received': response.text, 'token': bearer_token}
if response is not None
else {'token': bearer_token},
)
raise UserEmailRetrievalError('Unable to retrieve user email.') from exc
class AuthSession:
"""Wraps credentials passed to initiate a DICOM Proxy downsampling request."""
def __init__(
self,
authorization_header: Optional[Mapping[str, str]],
):
self._auth_dict = {}
self._userid_dict = {}
if authorization_header is not None and authorization_header:
norm_dict = flask_util.norm_dict_keys(
authorization_header,
[
proxy_const.HeaderKeywords.AUTH_HEADER_KEY,
proxy_const.HeaderKeywords.AUTHORITY_HEADER_KEY,
proxy_const.HeaderKeywords.IAP_EMAIL_KEY,
proxy_const.HeaderKeywords.IAP_USER_ID_KEY,
],
)
_add_key_if_defined_in_source(
self._auth_dict, norm_dict, proxy_const.HeaderKeywords.AUTH_HEADER_KEY
)
_add_key_if_defined_in_source(
self._auth_dict,
norm_dict,
proxy_const.HeaderKeywords.AUTHORITY_HEADER_KEY,
)
if dicom_proxy_flags.VALIDATE_IAP_FLG.value:
_add_key_if_defined_in_source(
self._userid_dict,
norm_dict,
proxy_const.HeaderKeywords.IAP_EMAIL_KEY,
)
_add_key_if_defined_in_source(
self._userid_dict,
norm_dict,
proxy_const.HeaderKeywords.IAP_USER_ID_KEY,
)
else:
bearer_token = self._auth_dict.get(
proxy_const.HeaderKeywords.AUTH_HEADER_KEY
)
if bearer_token is not None:
self._userid_dict[_EMAIL_DERIVED_FROM_BEARER_TOKEN] = (
_get_email_from_bearer_token(bearer_token)
)
if (
dicom_proxy_flags.ENABLE_APPLICATION_DEFAULT_CREDENTIALS_FLG.value
and proxy_const.HeaderKeywords.AUTH_HEADER_KEY not in self._auth_dict
):
# User default credentials returns the credentials of the GKE service
# account and not the calling user. The purpose of this code path is to
# enable e2e testing of the IAP enabled application in chrome in dev.
# For this pathway to work the tile-server service account must have
# read/write permissions on the DICOM store. When enabled the pathway can
# be activated via chrome without the need to supply a valid user bearer
# token. When deployed in the customers environment (production) the
# tile-server service account should not have permission to access the
# DICOM Store and the ENABLE_APPLICATION_DEFAULT_CREDENTIALS_FLG.value
# should be set to false.
self._init_to_service_account_credentials()
def _init_to_service_account_credentials(self) -> None:
"""Initializes service account default credentials."""
cloud_logging_client.info('Retrieving service account credentials.')
credentials, _ = google.auth.default()
# Credentials need to be refreshed to return a bearer token.
credentials.refresh(google.auth.transport.requests.Request())
bearer_token = f'Bearer {credentials.token}'
self._auth_dict[proxy_const.HeaderKeywords.AUTH_HEADER_KEY] = bearer_token
self._auth_dict[proxy_const.HeaderKeywords.AUTHORITY_HEADER_KEY] = ''
try:
self._userid_dict[_EMAIL_DERIVED_FROM_BEARER_TOKEN] = (
_get_email_from_bearer_token(bearer_token)
)
except UserEmailRetrievalError:
self._userid_dict[_EMAIL_DERIVED_FROM_BEARER_TOKEN] = (
'proxy-service-account'
)
@property
def email(self) -> str:
"""Returns email associated with token."""
email = self._userid_dict.get(_EMAIL_DERIVED_FROM_BEARER_TOKEN)
if email is not None:
return email
email = self._userid_dict.get(proxy_const.HeaderKeywords.IAP_EMAIL_KEY)
if email is not None:
return email
raise UserEmailRetrievalError('User email is unknown.')
@property
def iap_user_id(self) -> str:
"""Returns IAP user id."""
return self._userid_dict.get(proxy_const.HeaderKeywords.IAP_USER_ID_KEY, '')
@property
def authorization(self) -> str:
return self._auth_dict.get(proxy_const.HeaderKeywords.AUTH_HEADER_KEY, '')
@property
def authority(self) -> str:
return self._auth_dict.get(
proxy_const.HeaderKeywords.AUTHORITY_HEADER_KEY, ''
)
def add_to_header(
self, request_header: Optional[Mapping[str, str]]
) -> Mapping[str, str]:
"""Adds authentication token and scope to request header.
Args:
request_header: header to add auth to.
Returns:
header with auth
"""
if request_header is None:
request_header = {}
return_header = dict(request_header)
return_header.update(self._auth_dict)
return return_header
# The digitial_pathology_dicom proxy runs using gunicorn, which forks worker
# processes. Forked processes do not re-init global state and assume their
# values at the time of the fork. This can result in forked modules being
# started with invalid global state, e.g., acquired locks that will not release
# or references state. os.register at fork, defines a function run in child
# forked processes following the fork to re-initalize the forked global module
# state.
os.register_at_fork(after_in_child=_init_fork_module_state)