custos-client-sdks/custos-python-sdk/build/lib/custos/clients/identity_management_client.py (121 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import grpc from custos.transport.settings import CustosServerClientSettings from custos.server.integration.IdentityManagementService_pb2_grpc import IdentityManagementServiceStub from custos.server.core.IdentityService_pb2 import AuthenticationRequest, AuthToken, Claim, \ GetUserManagementSATokenRequest, \ GetTokenRequest, GetOIDCConfiguration, EndSessionRequest from custos.server.integration.IdentityManagementService_pb2 import AuthorizationRequest, GetCredentialsRequest, \ EndSessionRequest as Er, GetAgentTokenRequest from custos.clients.utils.certificate_fetching_rest_client import CertificateFetchingRestClient logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class IdentityManagementClient(object): def __init__(self, custos_server_setting): self.custos_settings = custos_server_setting self.target = self.custos_settings.CUSTOS_SERVER_HOST + ":" + str(self.custos_settings.CUSTOS_SERVER_PORT) certManager = CertificateFetchingRestClient(custos_server_setting) certManager.load_certificate() with open(self.custos_settings.CUSTOS_CERT_PATH, 'rb') as f: trusted_certs = f.read() self.channel_credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs) self.channel = grpc.secure_channel(target=self.target, credentials=self.channel_credentials) self.identity_stub = IdentityManagementServiceStub(self.channel) def authenticate(self, token, username, password): """ Used for local authentication :param token client credentials :param username Users username :param password Users password :return: Access token """ try: token = "Bearer " + token metadata = (('authorization', token),) request = AuthenticationRequest(username=username, password=password) return self.identity_stub.authenticate(request, metadata=metadata) except Exception: logger.exception("Error occurred in authenticate, probably due to invalid parameters") raise def is_authenticated(self, token, user_access_token, username): """ Check access token is valid :param token: client credential token :param user_access_token access token of user :param username :return: status (TRUE, FALSE) """ try: token = "Bearer " + token metadata = (('authorization', token),) claim = Claim(key="username", value=username) claims = [] claims.append(claim) request = AuthToken(accessToken=user_access_token, claims=claims) return self.identity_stub.isAuthenticated(request, metadata=metadata) except Exception: logger.exception("Error occurred in is_authenticated, probably due to invalid parameters") raise def get_service_account_access_token(self, token): """ Get service account access token :param token: client credentials :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GetUserManagementSATokenRequest(); return self.identity_stub.getUserManagementServiceAccountAccessToken(request, metadata=metadata) except Exception: logger.exception("Error occurred in get_service_account_access_token, probably due to invalid parameters") raise def authorize(self, client_id, redirect_uri, response_type, scope, state): """ return authorize url of keycloak :param redirect_uri: redirect URI of client :param response_type: response type (code) :param scope: (openid email profile) :param state: (random number) :return: """ try: request = AuthorizationRequest(client_id=client_id, redirect_uri=redirect_uri, response_type=response_type, scope=scope, state=state) return self.identity_stub.authorize(request) except Exception: logger.exception("Error occurred in authorize, probably due to invalid parameters") raise def token(self, token, redirect_uri=None, code=None, username=None, password=None, refresh_token=None, grant_type=None): """ provide user access token :param token: client credentials :param redirect_uri: redirect uri :param code: code returned from token :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GetTokenRequest(redirect_uri=redirect_uri, code=code, username=username, password=password, refresh_token=refresh_token, grant_type=grant_type) return self.identity_stub.token(request, metadata=metadata) except Exception: logger.exception("Error occurred in token, probably due to invalid parameters") raise def get_credentials(self, token, client_id): """ provides IAM and CILogon credentials :param token :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GetCredentialsRequest(client_id=client_id) return self.identity_stub.getCredentials(request, metadata=metadata) except Exception: logger.exception("Error occurred in get_credentials, probably due to invalid parameters") raise def get_oidc_configuration(self, token, client_id): """ send the OIDC config :param token client credentials :param client_id :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GetOIDCConfiguration(client_id=client_id) return self.identity_stub.getOIDCConfiguration(request, metadata=metadata) except Exception: logger.exception("Error occurred in get_OIDC_Configuration, probably due to invalid parameters") raise def end_user_session(self, token, refresh_token): """ End user session :param token: :param refresh_token: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) body = EndSessionRequest(refresh_token=refresh_token) request = Er(body=body) return self.identity_stub.endUserSession(request=request, metadata=metadata) except Exception: logger.exception("Error occurred while ending user session") raise def get_agent_token(self, token, client_id, grant_type, refresh_token=None): """ Get agent token :param token: base64Encoded(agentId:agentSec) :param client_id: parent Client Id :param grant_type: client_credentials, refresh_token :param refresh_token: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) request = GetAgentTokenRequest(client_id=client_id, grant_type=grant_type, refresh_token=refresh_token) return self.identity_stub.getAgentToken(request=request, metadata=metadata) except Exception: logger.exception("Error occurred while fetching agent token") raise def end_agent_session(self, token, refresh_token): """ End user session :param token: base64Encoded(agentId:agentSec) :param refresh_token: :return: """ try: token = "Bearer " + token metadata = (('authorization', token),) body = EndSessionRequest(refresh_token=refresh_token) request = Er(body=body) return self.identity_stub.endAgentSession(request=request, metadata=metadata) except Exception: logger.exception("Error occurred while ending agent session") raise