custos-client-sdks/custos-python-sdk/custos/clients/user_management_client.py (203 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import grpc from custos.transport.settings import CustosServerClientSettings from custos.server.integration.UserManagementService_pb2_grpc import UserManagementServiceStub from custos.server.core.IamAdminService_pb2 import RegisterUserRequest, UserRepresentation, RegisterUsersRequest, \ UserAttribute, \ AddUserAttributesRequest, DeleteUserAttributeRequest, UserSearchMetadata, FindUsersRequest, AddUserRolesRequest, \ UserSearchRequest, ResetUserPassword, DeleteUserRolesRequest from custos.server.core.UserProfileService_pb2 import UserProfile from custos.server.integration.UserManagementService_pb2 import LinkUserProfileRequest, UserProfileRequest from custos.clients.utils.certificate_fetching_rest_client import CertificateFetchingRestClient logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class UserManagementClient(object): def __init__(self, custos_server_setting): self.custos_settings = custos_server_setting self.target = self.custos_settings.CUSTOS_SERVER_HOST + ":" + str(self.custos_settings.CUSTOS_SERVER_PORT) certManager = CertificateFetchingRestClient(custos_server_setting) certManager.load_certificate() with open(self.custos_settings.CUSTOS_CERT_PATH, 'rb') as f: trusted_certs = f.read() self.channel_credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs) self.channel = grpc.secure_channel(target=self.target, credentials=self.channel_credentials) self.user_stub = UserManagementServiceStub(self.channel) def register_user(self, token, username, first_name, last_name, password, email, is_temp_password): """ Register user in given tenant :param token: client credentials :param username: :param first_name: :param last_name: :param password: :param email: :param is_temp_password: :return: registration status """ try: token = "Bearer " + token metadata = (('authorization', token),) user = UserRepresentation(username=username, password=password, first_name=first_name, last_name=last_name, email=email, temporary_password=is_temp_password) request = RegisterUserRequest(user=user) return self.user_stub.registerUser(request, metadata=metadata) except Exception: logger.exception("Error occurred in register_user, probably due to invalid parameters") raise def register_and_enable_users(self, token, users): """ register and enable users :param token: admin access token :param users: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) usersList = [] for user in users: attributeList = [] for atr in user['attributes']: attribute = UserAttribute(key=atr['key'], values=atr['values']) attributeList.append(attribute) username = user['username'] password = user['password'] first_name = user['first_name'] last_name = user['last_name'] email = user['email'] temporary_password = user['temporary_password'] realm_roles = user['realm_roles'] client_roles = user['client_roles'] attributes = attributeList user = UserRepresentation(username=username, password=password, first_name=first_name, last_name=last_name, email=email, temporary_password=temporary_password, realm_roles=realm_roles, client_roles=client_roles, attributes=attributes) usersList.append(user) request = RegisterUsersRequest(users=usersList) return self.user_stub.registerAndEnableUsers(request, metadata=metadata) except Exception: logger.exception("Error occurred in register_and_enable_users, probably due to invalid parameters") raise def add_user_attributes(self, token, attributes, users): """ Add user attributes :param token: :param attributes: :param users: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) attributeList = [] for atr in attributes: attribute = UserAttribute(key=atr['key'], values=atr['values']) attributeList.append(attribute) request = AddUserAttributesRequest(attributes=attributeList, users=users) return self.user_stub.addUserAttributes(request, metadata=metadata) except Exception: logger.exception("Error occurred in add_user_attributes, probably due to invalid parameters") raise def delete_user_attributes(self, token, attributes, users): """ Delete user attributes :param token: user token :param attributes: :param users: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) attributeList = [] for atr in attributes: attribute = UserAttribute(key=atr['key'], values=atr['values']) attributeList.append(attribute) request = DeleteUserAttributeRequest(attributes=attributeList, users=users) return self.user_stub.deleteUserAttributes(request, metadata=metadata) except Exception: logger.exception("Error occurred in delete_user_attributes, probably due to invalid parameters") raise def enable_user(self, token, username): """ Enable user request :param token: client credential :param attributes: :param users: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) user = UserSearchMetadata(username=username) request = UserSearchRequest(user=user) return self.user_stub.enableUser(request, metadata=metadata) except Exception: logger.exception("Error occurred in enable_user, probably due to invalid parameters") raise def add_roles_to_users(self, token, usernames, roles, is_client_level): """ Add roles to users :param token: admin token :param usernames list of usersname :param : roles list of roles :param is_client_level to add client level else realm level :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = AddUserRolesRequest(usernames=usernames, roles=roles, client_level=is_client_level) return self.user_stub.addRolesToUsers(request, metadata=metadata) except Exception: logger.exception("Error occurred in add_roles_to_users, probably due to invalid parameters") raise def is_user_enabled(self, token, username): """ Check the weather user is enabled :param token: client credential :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) user = UserSearchMetadata(username=username) request = UserSearchRequest(user=user) return self.user_stub.isUserEnabled(request, metadata=metadata) except Exception: logger.exception("Error occurred in is_user_enabled, probably due to invalid parameters") raise def is_username_available(self, token, username): """ Check the weather username is available :param token: client credential :param username :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) user = UserSearchMetadata(username=username) request = UserSearchRequest(user=user) return self.user_stub.isUsernameAvailable(request, metadata=metadata) except Exception: logger.exception("Error occurred in is_username_available, probably due to invalid parameters") raise def get_user(self, token, username): """ Get user :param token: client credential :param username :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) user = UserSearchMetadata(username=username) request = UserSearchRequest(user=user) return self.user_stub.getUser(request, metadata=metadata) except Exception: logger.exception("Error occurred in get_user, probably due to invalid parameters") raise def find_users(self, token, offset, limit, username=None, firstname=None, lastname=None, email=None, ): """ Find users :param token: client credential :param username :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) user = UserSearchMetadata(username=username, first_name=firstname, last_name=lastname, email=email) request = FindUsersRequest(user=user, offset=offset, limit=limit) return self.user_stub.findUsers(request, metadata=metadata) except Exception: logger.exception("Error occurred in find_users, probably due to invalid parameters") raise def reset_password(self, token, username, password): """ Reset user password :param token: client credential :param username :param password :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = ResetUserPassword(username=username, password=password) return self.user_stub.resetPassword(request, metadata=metadata) except Exception: logger.exception("Error occurred in reset_password, probably due to invalid parameters") raise def delete_user(self, token, username): """ Delete user from a given realm :param token: admin credentials :param username: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) user = UserSearchMetadata(username=username) request = UserSearchRequest(user=user) return self.user_stub.deleteUser(request, metadata=metadata) except Exception: logger.exception("Error occurred in delete_user, probably due to invalid parameters") raise def delete_user_roles(self, token, username, client_roles, realm_roles): """ Delete user roles :param token: admin access token :param username: :param client_roles: :param realm_roles: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = DeleteUserRolesRequest(username=username, client_roles=client_roles, roles=realm_roles) return self.user_stub.deleteUserRoles(request, metadata=metadata) except Exception: logger.exception("Error occurred in delete_user_roles, probably due to invalid parameters") raise def update_user_profile(self, token, username, email, first_name, last_name): """ Update user profile :param token: user token :param username: :param email: :param first_name: :param last_name: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) profile = UserProfile(username=username, email=email, first_name=first_name, last_name=last_name) request = UserProfileRequest(user_profile=profile) return self.user_stub.updateUserProfile(request=request, metadata=metadata) except Exception: logger.exception("Error occurred in update_user_profile, probably due to invalid parameters") raise def link_user_profile(self, token, current_username, previous_username, linking_attributes=None): """ Link existing user profile with previous user profile :param previous_username: :param current_username: :param linking_attributes: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) attributeList = [] for atr in linking_attributes: attribute = UserAttribute(key=atr['key'], values=atr['values']) attributeList.append(attribute) request = LinkUserProfileRequest(current_username=current_username, previous_username=previous_username, linking_attributes=attributeList) return self.user_stub.linkUserProfile(request, metadata=metadata) except Exception: logger.exception("Error occurred in update_user_profile, probably due to invalid parameters") raise