custos-client-sdks/custos-python-sdk/build/lib/custos/clients/tenant_management_client.py (170 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import grpc from custos.server.integration.TenantManagementService_pb2_grpc import TenantManagementServiceStub from custos.server.core.TenantProfileService_pb2 import Tenant, GetTenantsRequest, GetAllTenantsForUserRequest from custos.server.core.IamAdminService_pb2 import AddRolesRequest, RoleRepresentation, AddProtocolMapperRequest, \ ClaimJSONTypes, MapperTypes from custos.server.integration.TenantManagementService_pb2 import GetTenantRequest, \ UpdateTenantRequest, DeleteTenantRequest from custos.transport.settings import CustosServerClientSettings from custos.clients.utils.certificate_fetching_rest_client import CertificateFetchingRestClient logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class TenantManagementClient(object): def __init__(self, custos_server_setting): self.custos_settings = custos_server_setting self.target = self.custos_settings.CUSTOS_SERVER_HOST + ":" + str(self.custos_settings.CUSTOS_SERVER_PORT) certManager = CertificateFetchingRestClient(custos_server_setting) certManager.load_certificate() with open(self.custos_settings.CUSTOS_CERT_PATH, 'rb') as f: trusted_certs = f.read() self.channel_credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs) self.channel = grpc.secure_channel(target=self.target, credentials=self.channel_credentials) self.tenant_stub = TenantManagementServiceStub(self.channel) def create_admin_tenant(self, client_name, requester_email, admin_frist_name, admin_last_name, admin_email, admin_username, admin_password, contacts, redirect_uris, client_uri, scope, domain, logo_uri, comment): """ Creates admin tenant client. Needs to be approved by Custos Admin :return: Custos Credentials """ try: tenant = Tenant(client_name=client_name, requester_email=requester_email, admin_first_name=admin_frist_name, admin_last_name=admin_last_name, admin_email=admin_email, admin_username=admin_username, admin_password=admin_password, contacts=contacts, redirect_uris=redirect_uris, client_uri=client_uri, scope=scope, domain=domain, logo_uri=logo_uri, comment=comment, application_type="web") return self.tenant_stub.createTenant(tenant) except Exception: logger.exception("Error occurred in create_admin_tenant, probably due to invalid parameters") raise def create_tenant(self, client_token, client_name, requester_email, admin_frist_name, admin_last_name, admin_email, admin_username, admin_password, contacts, redirect_uris, client_uri, scope, domain, logo_uri, comment): """ Creates child tenant under admin tenant. Automatically activates :return: Custos credentials """ try: tenant = Tenant(client_name=client_name, requester_email=requester_email, admin_first_name=admin_frist_name, admin_last_name=admin_last_name, admin_email=admin_email, admin_username=admin_username, admin_password=admin_password, contacts=contacts, redirect_uris=redirect_uris, client_uri=client_uri, scope=scope, domain=domain, logo_uri=logo_uri, comment=comment, application_type="web") token = "Bearer " + client_token metadata = (('authorization', token),) return self.tenant_stub.createTenant(tenant, metadata=metadata) except Exception: logger.exception("Error occurred in create_tenant, probably due to invalid parameters") raise def get_tenant(self, client_token, client_id): """ Fetch tenant :return: Tenant """ try: request = GetTenantRequest(client_id=client_id) token = "Bearer " + client_token metadata = (('authorization', token),) return self.tenant_stub.getTenant(request, metadata=metadata) except Exception: logger.exception("Error occurred in get_tenant, probably due to invalid parameters") raise def update_tenant(self, client_token, client_id, client_name, requester_email, admin_frist_name, admin_last_name, admin_email, admin_username, admin_password, contacts, redirect_uris, client_uri, scope, domain, logo_uri, comment): """ Update given tenant by client Id :return: updated tenant """ try: tenant = Tenant(client_name=client_name, requester_email=requester_email, admin_first_name=admin_frist_name, admin_last_name=admin_last_name, admin_email=admin_email, admin_username=admin_username, admin_password=admin_password, contacts=contacts, redirect_uris=redirect_uris, client_uri=client_uri, scope=scope, domain=domain, logo_uri=logo_uri, comment=comment, application_type="web") token = "Bearer " + client_token metadata = (('authorization', token),) request = UpdateTenantRequest(client_id=client_id, body=tenant) return self.tenant_stub.updateTenant(request, metadata=metadata) except Exception: logger.exception("Error occurred in update_tenant, probably due to invalid parameters") raise def delete_tenant(self, token, client_id): """ Delete given tenant by client Id :return: void """ try: token = "Bearer " + token metadata = (('authorization', token),) request = DeleteTenantRequest(client_id=client_id) return self.tenant_stub.deleteTenant(request, metadata=metadata) except Exception: logger.exception("Error occurred in delete_tenant, probably due to invalid parameters") raise def add_tenant_roles(self, token, roles, is_client_level): """ :param token :param: roles include realm or client level roles as array :param is_client_level boolean to indicate to add roles to client :return: void """ try: token = "Bearer " + token metadata = (('authorization', token),) rolesRepArray = [] for role in roles: rolesRep = RoleRepresentation(name=role['name'], description=role['description'], composite=role['composite']) rolesRepArray.append(rolesRep) request = AddRolesRequest(roles=rolesRepArray, client_level=is_client_level) return self.tenant_stub.addTenantRoles(request, metadata=metadata) except Exception: logger.exception("Error occurred in add_tenant_roles, probably due to invalid parameters") raise def add_protocol_mapper(self, token, name, attribute_name, claim_name, claim_type, mapper_type, add_to_id_token, add_to_access_token, add_to_user_info, multi_valued, aggregate_attribute_values): """ Protocol mapper enables to add user attributes, user realm roles or user client roles to be added to ID token, Access token. :param token :param: roles include realm or client level roles as array :param is_client_level boolean to indicate to add roles to client :return: void """ try: token = "Bearer " + token metadata = (('authorization', token),) wrapped_json_type = ClaimJSONTypes.Value(claim_type) wrapped_mapper_type = MapperTypes.Value(mapper_type) request = AddProtocolMapperRequest(name=name, attribute_name=attribute_name, claim_name=claim_name, claim_type=wrapped_json_type, mapper_type=wrapped_mapper_type, add_to_id_token=add_to_id_token, add_to_access_token=add_to_access_token, add_to_user_info=add_to_user_info, multi_valued=multi_valued, aggregate_attribute_values=aggregate_attribute_values ) return self.tenant_stub.addProtocolMapper(request, metadata=metadata) except Exception: logger.exception("Error occurred in add_protocol_mapper, probably due to invalid parameters") raise def get_child_tenants(self, token, offset, limit, status): """ Get child tenants of the calling tenant :param token :param: offset omit initial number of results equalt to offset :param limit results should contain maximum number of entries :param status (ACTIVE, REQUESTED, DENIED, CANCELLED, DEACTIVATED) :return: Tenants """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GetTenantsRequest(offset=offset, limit=limit, status=status) return self.tenant_stub.getChildTenants(request, metadata=metadata) except Exception: logger.exception("Error occurred in get_child_tenants, probably due to invalid parameters") raise def get_all_tenants(self, token, email): """ Get all tenants requested by given user :param token :param email get all tenants requested by email :return: Tenants """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GetAllTenantsForUserRequest(email=email) return self.tenant_stub.getAllTenantsForUser(request, metadata=metadata) except Exception: logger.exception("Error occurred in get_all_tenants, probably due to invalid parameters") raise