custos-client-sdks/custos-python-sdk/build/lib/custos/clients/user_management_client.py [24:387]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    UserAttribute, \
    AddUserAttributesRequest, DeleteUserAttributeRequest, UserSearchMetadata, FindUsersRequest, AddUserRolesRequest, \
    UserSearchRequest, ResetUserPassword, DeleteUserRolesRequest
from custos.server.core.UserProfileService_pb2 import UserProfile
from custos.server.integration.UserManagementService_pb2 import LinkUserProfileRequest, UserProfileRequest

from custos.clients.utils.certificate_fetching_rest_client import CertificateFetchingRestClient

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class UserManagementClient(object):

    def __init__(self, custos_server_setting):
        self.custos_settings = custos_server_setting
        self.target = self.custos_settings.CUSTOS_SERVER_HOST + ":" + str(self.custos_settings.CUSTOS_SERVER_PORT)
        certManager = CertificateFetchingRestClient(custos_server_setting)
        certManager.load_certificate()
        with open(self.custos_settings.CUSTOS_CERT_PATH, 'rb') as f:
            trusted_certs = f.read()
        self.channel_credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
        self.channel = grpc.secure_channel(target=self.target, credentials=self.channel_credentials)
        self.user_stub = UserManagementServiceStub(self.channel)

    def register_user(self, token, username, first_name, last_name, password, email, is_temp_password):
        """
        Register user in given tenant
        :param token:  client credentials
        :param username:
        :param first_name:
        :param last_name:
        :param password:
        :param email:
        :param is_temp_password:
        :return: registration status
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserRepresentation(username=username, password=password,
                                      first_name=first_name, last_name=last_name,
                                      email=email, temporary_password=is_temp_password)

            request = RegisterUserRequest(user=user)

            return self.user_stub.registerUser(request, metadata=metadata)
        except Exception:
            logger.exception("Error occurred in register_user, probably due to invalid parameters")
            raise

    def register_and_enable_users(self, token, users):
        """
        register and enable users
        :param token: admin access token
        :param users:
        :return:
        """

        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            usersList = []

            for user in users:
                attributeList = []
                for atr in user['attributes']:
                    attribute = UserAttribute(key=atr['key'], values=atr['values'])
                    attributeList.append(attribute)
                username = user['username']
                password = user['password']
                first_name = user['first_name']
                last_name = user['last_name']
                email = user['email']
                temporary_password = user['temporary_password']
                realm_roles = user['realm_roles']
                client_roles = user['client_roles']
                attributes = attributeList
                user = UserRepresentation(username=username, password=password,
                                          first_name=first_name, last_name=last_name,
                                          email=email, temporary_password=temporary_password,
                                          realm_roles=realm_roles, client_roles=client_roles,
                                          attributes=attributes)

                usersList.append(user)

            request = RegisterUsersRequest(users=usersList)
            return self.user_stub.registerAndEnableUsers(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in register_and_enable_users, probably due to invalid parameters")
            raise

    def add_user_attributes(self, token, attributes, users):
        """
        Add user attributes
        :param token:
        :param attributes:
        :param users:
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            attributeList = []
            for atr in attributes:
                attribute = UserAttribute(key=atr['key'], values=atr['values'])
                attributeList.append(attribute)

            request = AddUserAttributesRequest(attributes=attributeList, users=users)
            return self.user_stub.addUserAttributes(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in add_user_attributes, probably due to invalid parameters")
            raise

    def delete_user_attributes(self, token, attributes, users):
        """
        Delete user attributes
        :param token: user token
        :param attributes:
        :param users:
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            attributeList = []
            for atr in attributes:
                attribute = UserAttribute(key=atr['key'], values=atr['values'])
                attributeList.append(attribute)

            request = DeleteUserAttributeRequest(attributes=attributeList, users=users)
            return self.user_stub.deleteUserAttributes(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in delete_user_attributes, probably due to invalid parameters")
            raise

    def enable_user(self, token, username):
        """
        Enable user request
        :param token: client credential
        :param attributes:
        :param users:
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username)
            request = UserSearchRequest(user=user)
            return self.user_stub.enableUser(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in enable_user, probably due to invalid parameters")
            raise

    def add_roles_to_users(self, token, usernames, roles, is_client_level):
        """
        Add roles to users
        :param token: admin token
        :param usernames list of usersname
        :param : roles list of roles
        :param is_client_level to add client level else realm level
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            request = AddUserRolesRequest(usernames=usernames, roles=roles, client_level=is_client_level)
            return self.user_stub.addRolesToUsers(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in add_roles_to_users, probably due to invalid parameters")
            raise

    def is_user_enabled(self, token, username):
        """
        Check the weather user is enabled
        :param token: client credential
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username)
            request = UserSearchRequest(user=user)
            return self.user_stub.isUserEnabled(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in is_user_enabled, probably due to invalid parameters")
            raise

    def is_username_available(self, token, username):
        """
        Check the weather username  is available
        :param token: client credential
        :param username
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username)
            request = UserSearchRequest(user=user)
            return self.user_stub.isUsernameAvailable(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in is_username_available, probably due to invalid parameters")
            raise

    def get_user(self, token, username):
        """
        Get user
        :param token: client credential
        :param username
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username)
            request = UserSearchRequest(user=user)
            return self.user_stub.getUser(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in get_user, probably due to invalid parameters")
            raise

    def find_users(self, token, offset, limit, username=None, firstname=None, lastname=None, email=None, ):
        """
        Find users
        :param token: client credential
        :param username
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username, first_name=firstname, last_name=lastname, email=email)
            request = FindUsersRequest(user=user, offset=offset, limit=limit)
            return self.user_stub.findUsers(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in find_users, probably due to invalid parameters")
            raise

    def reset_password(self, token, username, password):
        """
        Reset user password
        :param token: client credential
        :param username
        :param password
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            request = ResetUserPassword(username=username, password=password)
            return self.user_stub.resetPassword(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in reset_password, probably due to invalid parameters")
            raise

    def delete_user(self, token, username):
        """
        Delete user from a given realm
        :param token: admin credentials
        :param username:
        :return:
        """

        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username)
            request = UserSearchRequest(user=user)
            return self.user_stub.deleteUser(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in delete_user, probably due to invalid parameters")
            raise

    def delete_user_roles(self, token, username, client_roles, realm_roles):
        """
        Delete user roles
        :param token: admin access token
        :param username:
        :param client_roles:
        :param realm_roles:
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            request = DeleteUserRolesRequest(username=username, client_roles=client_roles, roles=realm_roles)
            return self.user_stub.deleteUserRoles(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in delete_user_roles, probably due to invalid parameters")
            raise

    def update_user_profile(self, token, username, email, first_name, last_name):
        """
        Update user profile
        :param token: user token
        :param username:
        :param email:
        :param first_name:
        :param last_name:
        :return:
        """

        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            profile = UserProfile(username=username, email=email, first_name=first_name, last_name=last_name)
            request = UserProfileRequest(user_profile=profile)
            return self.user_stub.updateUserProfile(request=request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in update_user_profile, probably due to invalid parameters")
            raise

    def link_user_profile(self, token, current_username, previous_username, linking_attributes=None):
        """
        Link existing user profile with previous user profile
        :param previous_username:
        :param current_username:
        :param linking_attributes:
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            attributeList = []
            for atr in linking_attributes:
                attribute = UserAttribute(key=atr['key'], values=atr['values'])
                attributeList.append(attribute)

            request = LinkUserProfileRequest(current_username=current_username, previous_username=previous_username,
                                             linking_attributes=attributeList)
            return self.user_stub.linkUserProfile(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in update_user_profile, probably due to invalid parameters")
            raise
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



custos-client-sdks/custos-python-sdk/custos/clients/user_management_client.py [24:387]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    UserAttribute, \
    AddUserAttributesRequest, DeleteUserAttributeRequest, UserSearchMetadata, FindUsersRequest, AddUserRolesRequest, \
    UserSearchRequest, ResetUserPassword, DeleteUserRolesRequest
from custos.server.core.UserProfileService_pb2 import UserProfile
from custos.server.integration.UserManagementService_pb2 import LinkUserProfileRequest, UserProfileRequest

from custos.clients.utils.certificate_fetching_rest_client import CertificateFetchingRestClient

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class UserManagementClient(object):

    def __init__(self, custos_server_setting):
        self.custos_settings = custos_server_setting
        self.target = self.custos_settings.CUSTOS_SERVER_HOST + ":" + str(self.custos_settings.CUSTOS_SERVER_PORT)
        certManager = CertificateFetchingRestClient(custos_server_setting)
        certManager.load_certificate()
        with open(self.custos_settings.CUSTOS_CERT_PATH, 'rb') as f:
            trusted_certs = f.read()
        self.channel_credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
        self.channel = grpc.secure_channel(target=self.target, credentials=self.channel_credentials)
        self.user_stub = UserManagementServiceStub(self.channel)

    def register_user(self, token, username, first_name, last_name, password, email, is_temp_password):
        """
        Register user in given tenant
        :param token:  client credentials
        :param username:
        :param first_name:
        :param last_name:
        :param password:
        :param email:
        :param is_temp_password:
        :return: registration status
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserRepresentation(username=username, password=password,
                                      first_name=first_name, last_name=last_name,
                                      email=email, temporary_password=is_temp_password)

            request = RegisterUserRequest(user=user)

            return self.user_stub.registerUser(request, metadata=metadata)
        except Exception:
            logger.exception("Error occurred in register_user, probably due to invalid parameters")
            raise

    def register_and_enable_users(self, token, users):
        """
        register and enable users
        :param token: admin access token
        :param users:
        :return:
        """

        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            usersList = []

            for user in users:
                attributeList = []
                for atr in user['attributes']:
                    attribute = UserAttribute(key=atr['key'], values=atr['values'])
                    attributeList.append(attribute)
                username = user['username']
                password = user['password']
                first_name = user['first_name']
                last_name = user['last_name']
                email = user['email']
                temporary_password = user['temporary_password']
                realm_roles = user['realm_roles']
                client_roles = user['client_roles']
                attributes = attributeList
                user = UserRepresentation(username=username, password=password,
                                          first_name=first_name, last_name=last_name,
                                          email=email, temporary_password=temporary_password,
                                          realm_roles=realm_roles, client_roles=client_roles,
                                          attributes=attributes)

                usersList.append(user)

            request = RegisterUsersRequest(users=usersList)
            return self.user_stub.registerAndEnableUsers(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in register_and_enable_users, probably due to invalid parameters")
            raise

    def add_user_attributes(self, token, attributes, users):
        """
        Add user attributes
        :param token:
        :param attributes:
        :param users:
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            attributeList = []
            for atr in attributes:
                attribute = UserAttribute(key=atr['key'], values=atr['values'])
                attributeList.append(attribute)

            request = AddUserAttributesRequest(attributes=attributeList, users=users)
            return self.user_stub.addUserAttributes(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in add_user_attributes, probably due to invalid parameters")
            raise

    def delete_user_attributes(self, token, attributes, users):
        """
        Delete user attributes
        :param token: user token
        :param attributes:
        :param users:
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            attributeList = []
            for atr in attributes:
                attribute = UserAttribute(key=atr['key'], values=atr['values'])
                attributeList.append(attribute)

            request = DeleteUserAttributeRequest(attributes=attributeList, users=users)
            return self.user_stub.deleteUserAttributes(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in delete_user_attributes, probably due to invalid parameters")
            raise

    def enable_user(self, token, username):
        """
        Enable user request
        :param token: client credential
        :param attributes:
        :param users:
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username)
            request = UserSearchRequest(user=user)
            return self.user_stub.enableUser(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in enable_user, probably due to invalid parameters")
            raise

    def add_roles_to_users(self, token, usernames, roles, is_client_level):
        """
        Add roles to users
        :param token: admin token
        :param usernames list of usersname
        :param : roles list of roles
        :param is_client_level to add client level else realm level
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            request = AddUserRolesRequest(usernames=usernames, roles=roles, client_level=is_client_level)
            return self.user_stub.addRolesToUsers(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in add_roles_to_users, probably due to invalid parameters")
            raise

    def is_user_enabled(self, token, username):
        """
        Check the weather user is enabled
        :param token: client credential
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username)
            request = UserSearchRequest(user=user)
            return self.user_stub.isUserEnabled(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in is_user_enabled, probably due to invalid parameters")
            raise

    def is_username_available(self, token, username):
        """
        Check the weather username  is available
        :param token: client credential
        :param username
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username)
            request = UserSearchRequest(user=user)
            return self.user_stub.isUsernameAvailable(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in is_username_available, probably due to invalid parameters")
            raise

    def get_user(self, token, username):
        """
        Get user
        :param token: client credential
        :param username
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username)
            request = UserSearchRequest(user=user)
            return self.user_stub.getUser(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in get_user, probably due to invalid parameters")
            raise

    def find_users(self, token, offset, limit, username=None, firstname=None, lastname=None, email=None, ):
        """
        Find users
        :param token: client credential
        :param username
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username, first_name=firstname, last_name=lastname, email=email)
            request = FindUsersRequest(user=user, offset=offset, limit=limit)
            return self.user_stub.findUsers(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in find_users, probably due to invalid parameters")
            raise

    def reset_password(self, token, username, password):
        """
        Reset user password
        :param token: client credential
        :param username
        :param password
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            request = ResetUserPassword(username=username, password=password)
            return self.user_stub.resetPassword(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in reset_password, probably due to invalid parameters")
            raise

    def delete_user(self, token, username):
        """
        Delete user from a given realm
        :param token: admin credentials
        :param username:
        :return:
        """

        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            user = UserSearchMetadata(username=username)
            request = UserSearchRequest(user=user)
            return self.user_stub.deleteUser(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in delete_user, probably due to invalid parameters")
            raise

    def delete_user_roles(self, token, username, client_roles, realm_roles):
        """
        Delete user roles
        :param token: admin access token
        :param username:
        :param client_roles:
        :param realm_roles:
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            request = DeleteUserRolesRequest(username=username, client_roles=client_roles, roles=realm_roles)
            return self.user_stub.deleteUserRoles(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in delete_user_roles, probably due to invalid parameters")
            raise

    def update_user_profile(self, token, username, email, first_name, last_name):
        """
        Update user profile
        :param token: user token
        :param username:
        :param email:
        :param first_name:
        :param last_name:
        :return:
        """

        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            profile = UserProfile(username=username, email=email, first_name=first_name, last_name=last_name)
            request = UserProfileRequest(user_profile=profile)
            return self.user_stub.updateUserProfile(request=request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in update_user_profile, probably due to invalid parameters")
            raise

    def link_user_profile(self, token, current_username, previous_username, linking_attributes=None):
        """
        Link existing user profile with previous user profile
        :param previous_username:
        :param current_username:
        :param linking_attributes:
        :return:
        """
        try:
            token = "Bearer " + token
            metadata = (('authorization', token),)

            attributeList = []
            for atr in linking_attributes:
                attribute = UserAttribute(key=atr['key'], values=atr['values'])
                attributeList.append(attribute)

            request = LinkUserProfileRequest(current_username=current_username, previous_username=previous_username,
                                             linking_attributes=attributeList)
            return self.user_stub.linkUserProfile(request, metadata=metadata)

        except Exception:
            logger.exception("Error occurred in update_user_profile, probably due to invalid parameters")
            raise
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



