custos-client-sdks/custos-python-sdk/custos/clients/agent_management_client.py (159 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import grpc
from custos.transport.settings import CustosServerClientSettings
from custos.server.integration.AgentManagementService_pb2_grpc import AgentManagementServiceStub
from custos.server.core.IamAdminService_pb2 import AgentClientMetadata, RegisterUserRequest, \
UserRepresentation, UserAttribute, AddUserAttributesRequest, DeleteUserAttributeRequest, AddUserRolesRequest, \
DeleteUserRolesRequest, AddProtocolMapperRequest, ClaimJSONTypes, MapperTypes
from custos.server.integration.AgentManagementService_pb2 import AgentSearchRequest
from custos.clients.utils.certificate_fetching_rest_client import CertificateFetchingRestClient
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class AgentManagementClient(object):
def __init__(self, custos_server_setting):
self.custos_settings = custos_server_setting
self.target = self.custos_settings.CUSTOS_SERVER_HOST + ":" + str(self.custos_settings.CUSTOS_SERVER_PORT)
certManager = CertificateFetchingRestClient(custos_server_setting)
certManager.load_certificate()
with open(self.custos_settings.CUSTOS_CERT_PATH, 'rb') as f:
trusted_certs = f.read()
self.channel_credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
self.channel = grpc.secure_channel(target=self.target, credentials=self.channel_credentials)
self.agent_stub = AgentManagementServiceStub(self.channel)
def enable_agents(self, token):
"""
Enable agent registration for realm
:param token:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
request = AgentClientMetadata()
return self.agent_stub.enableAgents(request, metadata=metadata)
except Exception:
logger.exception("Error occurred while enabling agents")
raise
def configure_agent_client(self, token, access_token_life_time):
"""
Configure agent client
:param token:
:param access_token_life_time:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
request = AgentClientMetadata(access_token_life_time=access_token_life_time)
return self.agent_stub.enableAgents(request, metadata=metadata)
except Exception:
logger.exception("Error occurred while configuring agent client")
raise
def register_and_enable_agent(self, token, agent):
"""
Register and enable agent
:param agent:
:param token:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
attributeList = []
for atr in agent['attributes']:
attribute = UserAttribute(key=atr['key'], values=atr['values'])
attributeList.append(attribute)
id = agent['id']
realm_roles = agent['realm_roles']
user = UserRepresentation(id=id, realm_roles=realm_roles, attributes=attributeList)
request = RegisterUserRequest(user=user)
return self.agent_stub.registerAndEnableAgent(request=request, metadata=metadata)
except Exception:
logger.exception("Error occurred while enabling agents")
raise
def get_agent(self, token, id):
"""
Get agent having id
:param token:
:param id:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
request = AgentSearchRequest(id=id)
return self.agent_stub.getAgent(request=request, metadata=metadata)
except Exception:
logger.exception("Error occurred while fetching agent")
raise
def delete_agent(self, token, id):
"""
Delete agent having id
:param token:
:param id:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
request = AgentSearchRequest(id=id)
return self.agent_stub.deleteAgent(request=request, metadata=metadata)
except Exception:
logger.exception("Error occurred while deleting agent")
raise
def disable_agent(self, token, id):
"""
Disable agent having id
:param token:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
request = AgentSearchRequest(id=id)
return self.agent_stub.disableAgent(request=request, metadata=metadata)
except Exception:
logger.exception("Error occurred while disabling agent")
raise
def enable_agent(self, token, id):
"""
Enable agent having id
:param token:
:param id:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
request = AgentSearchRequest(id=id)
return self.agent_stub.enableAgent(request=request, metadata=metadata)
except Exception:
logger.exception("Error occurred while enabling agent")
raise
def add_agent_attributes(self, token, agents, attributes):
"""
Add attributes to agents
:param token:
:param agents:
:param attributes:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
attributeList = []
for atr in attributes:
attribute = UserAttribute(key=atr['key'], values=atr['values'])
attributeList.append(attribute)
request = AddUserAttributesRequest(attributes=attributeList, agents=agents)
return self.agent_stub.addAgentAttributes(request=request, metadata=metadata)
except Exception:
logger.exception("Error occurred while adding agent attributes")
raise
def delete_agent_attributes(self, token, agents, attributes):
"""
Delete agent attributes of agents
:param token:
:param agents:
:param attributes:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
attributeList = []
for atr in attributes:
attribute = UserAttribute(key=atr['key'], values=atr['values'])
attributeList.append(attribute)
request = DeleteUserAttributeRequest(attributes=attributeList, agents=agents)
return self.agent_stub.deleteAgentAttributes(request=request, metadata=metadata)
except Exception:
logger.exception("Error occurred while deleting agent attributes")
raise
def add_roles_to_agents(self, token, agents, roles):
"""
Add roles to agents
:param token:
:param agents:
:param roles:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
request = AddUserRolesRequest(agents=agents, roles=roles)
return self.agent_stub.addRolesToAgent(request=request, metadata=metadata)
except Exception:
logger.exception("Error occurred whiling adding roles to agents")
raise
def delete_roles_from_agent(self, token, id, roles):
"""
Delete roles from agent
:param token:
:param id:
:param roles:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
request = DeleteUserRolesRequest(id=id, roles=roles)
return self.agent_stub.deleteRolesFromAgent(request=request, metadata=metadata)
except Exception:
logger.exception("Error occurred while enabling agents")
raise
def add_protocol_mapper(self, token, name, attribute_name, claim_name, claim_type, mapper_type,
add_to_id_token, add_to_access_token, add_to_user_info, multi_valued,
aggregate_attribute_values):
"""
Add protocol mapper to agent client
:param token:
:param name:
:param attribute_name:
:param claim_name:
:param claim_type:
:param mapper_type:
:param add_to_id_token:
:param add_to_access_token:
:param add_to_user_info:
:param multi_valued:
:param aggregate_attribute_values:
:return:
"""
try:
token = "Bearer " + token
metadata = (('authorization', token),)
wrapped_json_type = ClaimJSONTypes.Value(claim_type)
wrapped_mapper_type = MapperTypes.Value(mapper_type)
request = AddProtocolMapperRequest(name=name,
attribute_name=attribute_name,
claim_name=claim_name,
claim_type=wrapped_json_type,
mapper_type=wrapped_mapper_type,
add_to_id_token=add_to_id_token,
add_to_access_token=add_to_access_token,
add_to_user_info=add_to_user_info,
multi_valued=multi_valued,
aggregate_attribute_values=aggregate_attribute_values
)
return self.agent_stub.addProtocolMapper(request, metadata=metadata)
except Exception:
logger.exception("Error occurred in add_protocol_mapper, probably due to invalid parameters")
raise