pathology/orchestrator/pathology_users_handler.py (344 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Handler for PathologyUsers rpc methods of DigitalPathology service.
Implementation of PathologyUsers rpc methods for DPAS API.
"""
from typing import List, Optional
from google.api_core import exceptions
from google.cloud import spanner
from google.cloud.spanner_v1 import transaction as tr
import grpc
from google.protobuf import field_mask_pb2
from pathology.orchestrator import id_generator
from pathology.orchestrator import logging_util
from pathology.orchestrator import pathology_resources_util
from pathology.orchestrator import rpc_status
from pathology.orchestrator.spanner import schema_resources
from pathology.orchestrator.spanner import spanner_util
from pathology.orchestrator.v1alpha import users_pb2
from pathology.shared_libs.logging_lib import cloud_logging_client
from pathology.shared_libs.spanner_lib import cloud_spanner_client
_IMMUTABLE_STATE_FIELDS = ('user_id',)
def _get_alias_names(user: users_pb2.PathologyUser) -> List[str]:
"""Returns list of alias names from list of PathologyUserAlias instances.
Args:
user: User to parse alias names from.
"""
return [alias.alias for alias in user.aliases]
def _build_user(
transaction: tr.Transaction,
user_id: int,
view: users_pb2.PathologyUserAliasListView = users_pb2.PATHOLOGY_USER_ALIAS_LIST_VIEW_NO_ALIASES,
alias_filter: Optional[str] = None,
) -> users_pb2.PathologyUser:
"""Builds a PathologyUser instance adding the aliases from a spanner lookup.
Args:
transaction: Transaction to run Spanner reads/writes on.
user_id: User Id for user.
view: Specifies the alias types to include in response.
alias_filter: String value for filtering by alias.
Returns:
PathologyUser instance.
Raises:
RpcFailure error if no aliases are present after filtering.
"""
user = users_pb2.PathologyUser(
name=pathology_resources_util.convert_user_id_to_name(str(user_id)),
user_id=str(user_id),
)
if view == users_pb2.PATHOLOGY_USER_ALIAS_LIST_VIEW_NO_ALIASES:
return user
if view == users_pb2.PATHOLOGY_USER_ALIAS_LIST_VIEW_NAME_ONLY:
# Look up only alias names.
alias_rows = transaction.read(
'UserAliases',
schema_resources.USER_ALIASES_COLS,
spanner.KeySet([[user_id, users_pb2.PATHOLOGY_USER_ALIAS_TYPE_NAME]]),
'UsersByTypedAliases',
)
for r in alias_rows:
dict_r = dict(zip(schema_resources.USER_ALIASES_COLS, r))
if (
alias_filter
and alias_filter in dict_r['UserAlias']
or not alias_filter
):
user.aliases.append(
users_pb2.PathologyUserAlias(
alias=dict_r['UserAlias'], alias_type=dict_r['AliasType']
)
)
else:
alias_rows = transaction.read(
'UserAliases',
schema_resources.USER_ALIASES_COLS,
spanner.KeySet([
[user_id, users_pb2.PATHOLOGY_USER_ALIAS_TYPE_EMAIL],
[user_id, users_pb2.PATHOLOGY_USER_ALIAS_TYPE_NAME],
[user_id, users_pb2.PATHOLOGY_USER_ALIAS_TYPE_UNSPECIFIED],
]),
'UsersByTypedAliases',
)
for r in alias_rows:
dict_r = dict(zip(schema_resources.USER_ALIASES_COLS, r))
if (
alias_filter
and alias_filter in dict_r['UserAlias']
or not alias_filter
):
user.aliases.append(
users_pb2.PathologyUserAlias(
alias=dict_r['UserAlias'], alias_type=dict_r['AliasType']
)
)
if not user.aliases:
raise rpc_status.RpcFailureError(
rpc_status.build_rpc_method_status_and_log(
grpc.StatusCode.NOT_FOUND,
f'Could not retrieve user {user_id}. No aliases were found in'
' database for user with requested filters.',
)
)
return user
def _get_user_by_id(
transaction: tr.Transaction,
user_id: int,
alias_view: users_pb2.PathologyUserAliasListView = users_pb2.PATHOLOGY_USER_ALIAS_LIST_VIEW_NO_ALIASES,
) -> users_pb2.PathologyUser:
"""Returns the user associated with a given id.
Args:
transaction: Transaction to run spanner reads/writes on.
user_id: Id of user to retrieve.
alias_view: Specifies the alias types to include in the response.
"""
row = transaction.read(
'Users', schema_resources.USERS_COLS, spanner.KeySet(keys=[[user_id]])
)
try:
row.one()
return _build_user(transaction, user_id, alias_view)
except exceptions.NotFound as exc:
raise rpc_status.RpcFailureError(
rpc_status.build_rpc_method_status_and_log(
grpc.StatusCode.NOT_FOUND,
f'Could not retrieve user {user_id}.',
exp=exc,
)
) from exc
def insert_user_in_table(
transaction, user: users_pb2.PathologyUser
) -> users_pb2.PathologyUser:
"""Inserts user and user aliases into table.
Args:
transaction: Transaction to run spanner read/writes on.
user: User to insert.
Returns:
PathologyUser
"""
# Generate new user id.
user_id = id_generator.OrchestratorIdGenerator.generate_new_id_for_table(
transaction, 'Users', schema_resources.USERS_COLS
)
# Insert row in users table.
transaction.insert('Users', schema_resources.USERS_COLS, [[user_id]])
# Insert aliases.
for alias in user.aliases:
transaction.insert(
'UserAliases',
schema_resources.USER_ALIASES_COLS,
[[alias.alias, user_id, alias.alias_type]],
)
user.name = pathology_resources_util.convert_user_id_to_name(str(user_id))
user.user_id = str(user_id)
cloud_logging_client.info(f'Creating PathologyUser {user.name}.')
return user
def _identify_or_create_user(
transaction: tr.Transaction, alias: str
) -> users_pb2.PathologyUser:
"""Returns the current user identified by the alias or creates a new user.
Args:
transaction: Spanner transaction to run reads/writes on.
alias: Alias to identify user.
Returns:
PathologyUser
"""
try:
user_id = spanner_util.search_user_by_alias(alias, transaction)
cloud_logging_client.info(f'Searching for user with alias {alias}.')
return _get_user_by_id(
transaction,
user_id,
alias_view=users_pb2.PATHOLOGY_USER_ALIAS_LIST_VIEW_NO_ALIASES,
)
except (exceptions.NotFound, rpc_status.RpcFailureError):
cloud_logging_client.info(f'Creating new PathologyUser with alias {alias}.')
# User was not found, create new user.
return insert_user_in_table(
transaction,
users_pb2.PathologyUser(
aliases=[
users_pb2.PathologyUserAlias(
alias=alias,
alias_type=users_pb2.PATHOLOGY_USER_ALIAS_TYPE_EMAIL,
)
]
),
)
def _list_users(
transaction: tr.Transaction,
alias_view: users_pb2.PathologyUserAliasListView = users_pb2.PATHOLOGY_USER_ALIAS_LIST_VIEW_NO_ALIASES,
alias_filter: str = '',
) -> List[users_pb2.PathologyUser]:
"""Returns a list of PathologyUsers in database.
Args:
transaction: Transaction to run Spanner reads/writes on.
alias_view: View to return aliases.
alias_filter: Filter for aliases.
"""
result = []
rows = transaction.read(
'Users', schema_resources.USERS_COLS, spanner.KeySet(all_=True)
)
for r in rows:
try:
user = _build_user(transaction, r[0], alias_view, alias_filter)
result.append(user)
except rpc_status.RpcFailureError as rpc_err:
cloud_logging_client.warning(rpc_err.status.error_msg)
continue
return result
def _process_update_mask(update_mask: field_mask_pb2.FieldMask) -> None:
"""Processes the update mask to check for invalid fields.
Args:
update_mask: Field mask with fields to update.
Raises:
RpcFailureError if invalid fields are found.
"""
for field in update_mask.paths:
if field in _IMMUTABLE_STATE_FIELDS:
raise rpc_status.RpcFailureError(
rpc_status.build_rpc_method_status_and_log(
grpc.StatusCode.INVALID_ARGUMENT,
f'Field {field} is an immutable state field and cannot be '
'manually updated.',
)
)
if field != 'aliases':
# No other mutable field in PathologyUser.
raise rpc_status.RpcFailureError(
rpc_status.build_rpc_method_status_and_log(
grpc.StatusCode.INVALID_ARGUMENT,
f'Invalid mask provided. Field in {update_mask.paths} is '
'immutable or does not exist for users.',
)
)
def _update_aliases_in_table(
transaction, user_id: int, user: users_pb2.PathologyUser
) -> None:
"""Updates user aliases in Spanner table and removes any deleted aliases.
Args:
transaction: Transaction to run spanner read/writes on.
user_id: User id to associate alias with.
user: User instance with updated aliases.
"""
# Insert aliases in table.
alias_vals = [
[alias.alias, user_id, alias.alias_type] for alias in user.aliases
]
transaction.insert_or_update(
'UserAliases', schema_resources.USER_ALIASES_COLS, alias_vals
)
# Check for removed aliases.
alias_rows = transaction.read(
'UserAliases',
schema_resources.USER_ALIASES_COLS,
spanner.KeySet([
[user_id, users_pb2.PATHOLOGY_USER_ALIAS_TYPE_EMAIL],
[user_id, users_pb2.PATHOLOGY_USER_ALIAS_TYPE_NAME],
[user_id, users_pb2.PATHOLOGY_USER_ALIAS_TYPE_UNSPECIFIED],
]),
'UsersByTypedAliases',
)
alias_names = _get_alias_names(user)
for row in alias_rows:
alias_name = row[schema_resources.USER_ALIASES_COLS.index('UserAlias')]
if alias_name not in alias_names:
transaction.delete('UserAliases', spanner.KeySet(keys=[[alias_name]]))
class PathologyUsersHandler:
"""Handler for PathologyUsers rpc methods of DigitalPathology service."""
def __init__(self):
super().__init__()
self._spanner_client = cloud_spanner_client.CloudSpannerClient()
def _verify_user_aliases(
self, user_id: int, user: users_pb2.PathologyUser
) -> bool:
"""Verifies if the UserAliases rows match the User proto's aliases.
Args:
user_id: Id of user.
user: User instance to match aliases against.
Returns:
True if rows match proto data.
"""
user_aliases_rows = self._spanner_client.read_data(
'UserAliases',
schema_resources.USER_ALIASES_COLS,
spanner.KeySet([
[user_id, users_pb2.PATHOLOGY_USER_ALIAS_TYPE_EMAIL],
[user_id, users_pb2.PATHOLOGY_USER_ALIAS_TYPE_NAME],
[user_id, users_pb2.PATHOLOGY_USER_ALIAS_TYPE_UNSPECIFIED],
]),
'UsersByTypedAliases',
)
alias_names = _get_alias_names(user)
count = 0
for row in user_aliases_rows:
count += 1
if (
row[schema_resources.USER_ALIASES_COLS.index('UserAlias')]
not in alias_names
):
return False
return count == len(alias_names)
def create_pathology_user(
self, request: users_pb2.CreatePathologyUserRequest
) -> users_pb2.PathologyUser:
"""Creates a PathologyUser.
Args:
request: A CreatePathologyUserRequest with user to create.
Returns:
PathologyUser.
Raises:
RpcFailureError on failure.
"""
cloud_logging_client.set_log_signature(
logging_util.get_structured_log(
logging_util.RpcMethodName.CREATE_PATHOLOGY_USER
)
)
if not request.pathology_user.aliases:
raise rpc_status.RpcFailureError(
rpc_status.build_rpc_method_status_and_log(
grpc.StatusCode.INVALID_ARGUMENT,
'User must have at least one alias.',
)
)
user = request.pathology_user
# Check if any of aliases already exist.
alias_names = [[alias.alias] for alias in user.aliases]
check_aliases_result = self._spanner_client.read_data(
'UserAliases', ['UserAlias'], spanner.KeySet(keys=alias_names)
)
try:
for row in check_aliases_result:
raise rpc_status.RpcFailureError(
rpc_status.build_rpc_method_status_and_log(
grpc.StatusCode.ALREADY_EXISTS,
f'User with alias {row[0]} already exists.',
)
)
except exceptions.NotFound as exp:
cloud_logging_client.info('Verified that user aliases do not exist.', exp)
# Insert user and user aliases into table.
return self._spanner_client.run_in_transaction(insert_user_in_table, user)
def get_pathology_user(
self, request: users_pb2.GetPathologyUserRequest, alias: str
) -> users_pb2.PathologyUser:
"""Gets a PathologyUser.
Args:
request: A GetPathologyUserRequest with user to retrieve.
alias: Alias of rpc caller.
Returns:
PathologyUser.
Raises:
RpcFailureError on failure.
"""
resource_name = request.name
cloud_logging_client.set_log_signature(
logging_util.get_structured_log(
logging_util.RpcMethodName.GET_PATHOLOGY_USER, resource_name
)
)
cloud_logging_client.info(f'Retrieving user {resource_name}.')
try:
caller_id = spanner_util.search_user_by_alias(alias)
except exceptions.NotFound:
caller_id = None
user_id = pathology_resources_util.get_id_from_name(resource_name)
if not caller_id or caller_id != user_id:
raise rpc_status.RpcFailureError(
rpc_status.build_rpc_method_status_and_log(
grpc.StatusCode.PERMISSION_DENIED,
f'User does not have access to requested user {resource_name}.',
)
)
return self._spanner_client.run_in_transaction(
_get_user_by_id, user_id, request.view
)
def identify_current_user(self, alias: str) -> users_pb2.PathologyUser:
"""Identifies the current PathologyUser by alias or creates a new user.
Args:
alias: Alias of rpc caller.
Returns:
PathologyUser.
Raises:
RpcFailureError on failure.
"""
cloud_logging_client.set_log_signature(
logging_util.get_structured_log(
logging_util.RpcMethodName.IDENTIFY_PATHOLOGY_USER
)
)
return self._spanner_client.run_in_transaction(
_identify_or_create_user, alias
)
def list_pathology_users(
self, request: users_pb2.ListPathologyUsersRequest
) -> users_pb2.ListPathologyUsersResponse:
"""Lists PathologyUsers.
Args:
request: A ListPathologyUsersRequest.
Returns:
ListPathologyUsersResponse including all users in database.
"""
response = users_pb2.ListPathologyUsersResponse(
pathology_users=self._spanner_client.run_in_transaction(
_list_users, request.view, request.filter
)
)
return response
def update_pathology_user(
self, request: users_pb2.UpdatePathologyUserRequest, alias: str
) -> users_pb2.PathologyUser:
"""Updates a PathologyUser.
Args:
request: An UpdatePathologyUserRequest with user to update and fields to
update contained in a field mask.
alias: Email of rpc caller.
Returns:
PathologyUser.
Raises:
RpcFailureError on failure.
"""
resource_name = request.pathology_user.name
cloud_logging_client.set_log_signature(
logging_util.get_structured_log(
logging_util.RpcMethodName.UPDATE_PATHOLOGY_USER, resource_name
)
)
user_to_update = self.get_pathology_user(
users_pb2.GetPathologyUserRequest(name=resource_name), alias
)
# Verify caller matches user to update.
caller_id = int(self.identify_current_user(alias).user_id)
if caller_id != int(user_to_update.user_id):
raise rpc_status.RpcFailureError(
rpc_status.build_rpc_method_status_and_log(
grpc.StatusCode.PERMISSION_DENIED,
'User to update must match the caller.',
)
)
# Check for invalid arguments and pass to handler to process.
if request.update_mask:
_process_update_mask(request.update_mask)
user_to_update.ClearField('aliases')
user_to_update.aliases.MergeFrom(request.pathology_user.aliases)
else:
cloud_logging_client.info(
'No fields to update specified in update field mask.'
)
return user_to_update
# Update aliases in spanner.
try:
self._spanner_client.run_in_transaction(
_update_aliases_in_table, caller_id, user_to_update
)
cloud_logging_client.info(f'Updated user {user_to_update.name}.')
except Exception as exc:
if exc.__class__ is rpc_status.RpcFailureError:
raise exc
raise rpc_status.RpcFailureError(
rpc_status.build_rpc_method_status_and_log(
grpc.StatusCode.INTERNAL, f'Failed to update user {caller_id}.'
)
) from exc
return user_to_update