custos-client-sdks/custos-python-sdk/custos/clients/group_management_client.py (95 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import grpc from custos.transport.settings import CustosServerClientSettings from custos.server.integration.GroupManagementService_pb2_grpc import GroupManagementServiceStub from custos.server.core.UserProfileService_pb2 import GroupToGroupMembership, Group, GroupRequest, GroupMembership from custos.clients.utils.certificate_fetching_rest_client import CertificateFetchingRestClient logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class GroupManagementClient(object): def __init__(self, custos_server_setting): self.custos_settings = custos_server_setting self.target = self.custos_settings.CUSTOS_SERVER_HOST + ":" + str(self.custos_settings.CUSTOS_SERVER_PORT) certManager = CertificateFetchingRestClient(custos_server_setting) certManager.load_certificate() with open(self.custos_settings.CUSTOS_CERT_PATH, 'rb') as f: trusted_certs = f.read() self.channel_credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs) self.channel = grpc.secure_channel(target=self.target, credentials=self.channel_credentials) self.group_stub = GroupManagementServiceStub(self.channel) def create_group(self, token, name, description, owner_id): """ Create groups :param owner_id: :param description: :param name: :param token: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) rep = Group(name=name, realm_roles=[], client_roles=[], attributes=[], description=description, owner_id=owner_id) request = GroupRequest(group=rep) return self.group_stub.createGroup(request=request, metadata=metadata) except Exception: logger.exception("Error occurred while creating groups") raise def delete_group(self, token, id): """ delete group using group id :param token: :param id: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GroupRequest(id=id) return self.group_stub.deleteGroup(request=request, metadata=metadata) except Exception: logger.exception("Error occurred while deleting group") raise def find_group(self, token, group_name, group_id): """ find group using group name :param token: :param group_name: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) gr = Group(id=group_id, name=group_name) request = GroupRequest(id=group_id, group=gr) return self.group_stub.findGroup(request=request, metadata=metadata) except Exception: logger.exception("Error occurred finding group") raise def get_all_groups(self, token): """ Get all groups of tenant :param token: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GroupRequest() return self.group_stub.getAllGroups(request=request, metadata=metadata) except Exception: logger.exception("Error occurred while pulling groups") raise def add_user_to_group(self, token, username, group_id, membership_type): """ Add user to group :param token: :param username: :param group_id: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GroupMembership(username=username, group_id=group_id, type=membership_type) return self.group_stub.addUserToGroup(request=request, metadata=metadata) except Exception: logger.exception("Error occurred while adding user to group") raise def remove_user_from_group(self, token, username, group_id): """ Remove user from group :param token: :param username: :param group_id: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GroupMembership(username=username, group_id=group_id) return self.group_stub.removeUserFromGroup(request=request, metadata=metadata) except Exception: logger.exception("Error occurred while removing user from group") raise def add_child_group(self, token, parent_group_id, child_group_id): try: token = "Bearer " + token metadata = (('authorization', token),) grm = GroupToGroupMembership(child_id=child_group_id, parent_id=parent_group_id) return self.group_stub.addChildGroupToParentGroup(request=grm, metadata=metadata) except Exception: logger.exception("Error occurred while adding child group") raise def remove_child_group(self, token, parent_group_id, child_group_id): try: token = "Bearer " + token metadata = (('authorization', token),) grm = GroupToGroupMembership(child_id=child_group_id, parent_id=parent_group_id) return self.group_stub.removeChildGroupFromParentGroup(request=grm, metadata=metadata) except Exception: logger.exception("Error occurred while removing child group from group") raise