source/idea/idea-cluster-manager/src/ideaclustermanager/app/api/accounts_api.py (278 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
# with the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions
# and limitations under the License.
from ideasdk.api import BaseAPI, ApiInvocationContext
from ideadatamodel.auth import (
SignUpUserRequest,
SignUpUserResult,
ConfirmSignUpRequest,
ConfirmSignUpResult,
ResendConfirmationCodeRequest,
ResendConfirmationCodeResult,
GetUserRequest,
GetUserResult,
GetUserByEmailRequest,
GetUserByEmailResult,
ModifyUserRequest,
ModifyUserResult,
EnableUserRequest,
EnableUserResult,
DisableUserRequest,
DisableUserResult,
ListUsersRequest,
ResetPasswordRequest,
ResetPasswordResult,
ModifyGroupRequest,
ModifyGroupResult,
EnableGroupRequest,
EnableGroupResult,
DisableGroupRequest,
DisableGroupResult,
GetGroupRequest,
GetGroupResult,
ListGroupsRequest,
ListUsersInGroupRequest,
AddAdminUserRequest,
AddAdminUserResult,
RemoveAdminUserRequest,
RemoveAdminUserResult,
GlobalSignOutRequest,
GlobalSignOutResult,
)
from ideadatamodel import exceptions
from ideasdk.utils import Utils
import ideaclustermanager
from res.resources import accounts
class AccountsAPI(BaseAPI):
def __init__(self, context: ideaclustermanager.AppContext):
self.context = context
self.SCOPE_WRITE = f'{self.context.module_id()}/write'
self.SCOPE_READ = f'{self.context.module_id()}/read'
self.acl = {
'Accounts.SignUpUser': {
'scope': self.SCOPE_WRITE,
'method': self.sign_up_user
},
'Accounts.ConfirmSignUp': {
'scope': self.SCOPE_WRITE,
'method': self.confirm_sign_up
},
'Accounts.ResendConfirmationCode': {
'scope': self.SCOPE_WRITE,
'method': self.resend_confirmation_code
},
'Accounts.GetUser': {
'scope': self.SCOPE_READ,
'method': self.get_user
},
'Accounts.GetUserByEmail': {
'scope': self.SCOPE_READ,
'method': self.get_user_by_email
},
'Accounts.ModifyUser': {
'scope': self.SCOPE_WRITE,
'method': self.modify_user
},
'Accounts.ListUsers': {
'scope': self.SCOPE_READ,
'method': self.list_users
},
'Accounts.EnableUser': {
'scope': self.SCOPE_WRITE,
'method': self.enable_user
},
'Accounts.DisableUser': {
'scope': self.SCOPE_WRITE,
'method': self.disable_user
},
'Accounts.GetGroup': {
'scope': self.SCOPE_READ,
'method': self.get_group
},
'Accounts.ModifyGroup': {
'scope': self.SCOPE_WRITE,
'method': self.modify_group
},
'Accounts.EnableGroup': {
'scope': self.SCOPE_WRITE,
'method': self.enable_group
},
'Accounts.DisableGroup': {
'scope': self.SCOPE_WRITE,
'method': self.disable_group
},
'Accounts.ListGroups': {
'scope': self.SCOPE_READ,
'method': self.list_groups
},
'Accounts.ListUsersInGroup': {
'scope': self.SCOPE_READ,
'method': self.list_users_in_group
},
'Accounts.AddAdminUser': {
'scope': self.SCOPE_WRITE,
'method': self.add_admin_user
},
'Accounts.RemoveAdminUser': {
'scope': self.SCOPE_WRITE,
'method': self.remove_admin_user
},
'Accounts.GlobalSignOut': {
'scope': self.SCOPE_WRITE,
'method': self.global_sign_out
},
'Accounts.ResetPassword': {
'scope': self.SCOPE_WRITE,
'method': self.reset_password
}
}
@property
def token_service(self):
return self.context.token_service
def is_applicable(self, context: ApiInvocationContext, scope: str) -> bool:
access_token = context.access_token
decoded_token = self.token_service.decode_token(access_token)
groups = Utils.get_value_as_list('cognito:groups', decoded_token)
if Utils.is_not_empty(groups) and self.context.user_pool.admin_group_name in groups:
return True
token_scope = Utils.get_value_as_string('scope', decoded_token)
if Utils.is_empty(token_scope):
return False
return scope in token_scope.split(' ')
def sign_up_user(self, context: ApiInvocationContext):
request = context.get_request_payload_as(SignUpUserRequest)
self.context.accounts.sign_up_user(request)
context.success(SignUpUserResult())
def confirm_sign_up(self, context: ApiInvocationContext):
request = context.get_request_payload_as(ConfirmSignUpRequest)
self.context.accounts.confirm_sign_up(request)
context.success(ConfirmSignUpResult())
def resend_confirmation_code(self, context: ApiInvocationContext):
request = context.get_request_payload_as(ResendConfirmationCodeRequest)
self.context.accounts.resend_confirmation_code(request)
context.success(ResendConfirmationCodeResult())
def get_user(self, context: ApiInvocationContext):
request = context.get_request_payload_as(GetUserRequest)
user = self.context.accounts.get_user(request.username)
context.success(GetUserResult(user=user))
def get_user_by_email(self, context: ApiInvocationContext):
request = context.get_request_payload_as(GetUserByEmailRequest)
user = self.context.accounts.get_user_by_email(request.email)
context.success(GetUserByEmailResult(user=user))
def modify_user(self, context: ApiInvocationContext):
request = context.get_request_payload_as(ModifyUserRequest)
user = self.context.accounts.modify_user(request.user)
context.success(ModifyUserResult(
user=user
))
def enable_user(self, context: ApiInvocationContext):
request = context.get_request_payload_as(EnableUserRequest)
accounts.update_user(user={'username': request.username, 'enabled': True}, force=True)
context.success(EnableUserResult())
def disable_user(self, context: ApiInvocationContext):
request = context.get_request_payload_as(DisableUserRequest)
self.context.accounts.disable_user(request.username)
context.success(DisableUserResult())
def list_users(self, context: ApiInvocationContext):
request = context.get_request_payload_as(ListUsersRequest)
result = self.context.accounts.list_users(request)
context.success(result)
def global_sign_out(self, context: ApiInvocationContext):
request = context.get_request_payload_as(GlobalSignOutRequest)
self.context.accounts.global_sign_out(username=request.username)
context.success(GlobalSignOutResult())
def reset_password(self, context: ApiInvocationContext):
request = context.get_request_payload_as(ResetPasswordRequest)
self.context.accounts.reset_password(request.username)
context.success(ResetPasswordResult())
def modify_group(self, context: ApiInvocationContext):
request = context.get_request_payload_as(ModifyGroupRequest)
modified_group = self.context.accounts.modify_group(request.group)
context.success(ModifyGroupResult(
group=modified_group
))
def enable_group(self, context: ApiInvocationContext):
request = context.get_request_payload_as(EnableGroupRequest)
accounts.update_group({
'group_name': request.group_name,
'enabled': True
}, force=True)
context.success(EnableGroupResult())
def disable_group(self, context: ApiInvocationContext):
request = context.get_request_payload_as(DisableGroupRequest)
accounts.update_group({
'group_name': request.group_name,
'enabled': False
}, force=True)
context.success(DisableGroupResult())
def get_group(self, context: ApiInvocationContext):
request = context.get_request_payload_as(GetGroupRequest)
group = self.context.accounts.get_group(request.group_name)
context.success(GetGroupResult(
group=group
))
def list_groups(self, context: ApiInvocationContext):
request = context.get_request_payload_as(ListGroupsRequest)
result = self.context.accounts.list_groups(request)
context.success(result)
def list_users_in_group(self, context: ApiInvocationContext):
request = context.get_request_payload_as(ListUsersInGroupRequest)
result = self.context.accounts.list_users_in_group(request)
context.success(result)
def add_admin_user(self, context: ApiInvocationContext):
request = context.get_request_payload_as(AddAdminUserRequest)
accounts.update_user({
'username': request.username,
'role': 'admin',
'sudo': True
})
user = self.context.accounts.get_user(request.username)
context.success(AddAdminUserResult(
user=user
))
def remove_admin_user(self, context: ApiInvocationContext):
request = context.get_request_payload_as(RemoveAdminUserRequest)
self.context.accounts.remove_admin_user(request.username)
user = self.context.accounts.get_user(request.username)
context.success(RemoveAdminUserResult(
user=user
))
@staticmethod
def check_admin_authorization_services(context: ApiInvocationContext):
"""
only an admin user should be able to create additional admin users or perform
admin user related operations
a "manager" user should be able to add new users, but without admin access
an admin user cannot be created using client credentials grant scope. needs an admin "user" to create a admin user.
"""
namespace = context.namespace
is_admin_authorization_required = namespace in (
'Accounts.AddAdminUser',
'Accounts.RemoveAdminUser',
'Accounts.ModifyUser'
)
if not is_admin_authorization_required:
return
if context.is_administrator():
return
raise exceptions.unauthorized_access()
def invoke(self, context: ApiInvocationContext):
namespace = context.namespace
acl_entry = Utils.get_value_as_dict(namespace, self.acl)
if acl_entry is None:
raise exceptions.unauthorized_access()
# special check only applicable for admin users related functionality
self.check_admin_authorization_services(context)
acl_entry_scope = Utils.get_value_as_string('scope', acl_entry)
enable_self_sign_up = self.context.config().get_bool('identity-provider.cognito.enable_self_sign_up', required=True)
enable_native_user_login = self.context.config().get_bool('identity-provider.cognito.enable_native_user_login', required=True)
# If `enable_self_sign_up` is True and `enable_native_user` is True all customers with access to the web app can sign up for, verify, and resend confirmation code for their account
if namespace in ['Accounts.SignUpUser', 'Accounts.ConfirmSignUp', 'Accounts.ResendConfirmationCode'] and enable_self_sign_up and enable_native_user_login:
acl_entry['method'](context)
return
is_authorized = context.is_authorized(elevated_access=True, scopes=[acl_entry_scope])
# Allow if admin
if is_authorized:
acl_entry['method'](context)
return
if namespace == 'Accounts.ListUsers':
username = context.get_username()
user = self.context.accounts.get_user(username)
if self.context.role_assignments.check_permissions_in_any_role(user=user, permissions=["projects.update_personnel","vdis.create_terminate_others_sessions"]):
acl_entry['method'](context)
return
elif namespace == 'Accounts.ListGroups':
username = context.get_username()
user = self.context.accounts.get_user(username)
if self.context.role_assignments.check_permissions_in_any_role(user=user, permissions=["projects.update_personnel"]):
acl_entry['method'](context)
return
raise exceptions.unauthorized_access()