in airavata_custos_portal/apps/api/views.py [0:0]
def get_custos_api(request, endpoint_path=""):
CUSTOS_REDIRECT_URI = request.build_absolute_uri('/api/callback')
client_auth_base64 = get_client_auth_base64(request)
client_auth_cases_map = {
"token_password": endpoint_path == f"{ENDPOINTS['IDENTITY']}/token" and "grant_type" in request.data
and request.data["grant_type"] == "password",
"token_refresh_token": endpoint_path == f"{ENDPOINTS['IDENTITY']}/token" and "grant_type" in request.data
and request.data["grant_type"] == "refresh_token",
"token_authorization_code": endpoint_path == f"{ENDPOINTS['IDENTITY']}/token" and "grant_type" in request.data
and request.data["grant_type"] == "authorization_code",
"token_openid-configuration": endpoint_path == f"{ENDPOINTS['IDENTITY']}/.well-known/openid-configuration",
"logout": endpoint_path == f"{ENDPOINTS['IDENTITY']}/user/logout",
"tenant_create": endpoint_path == f"{ENDPOINTS['TENANTS']}/oauth2/tenant" and request.method == "POST"
}
authorization_header = None
if True in client_auth_cases_map.values():
authorization_header = client_auth_base64
elif "access_token" in request.session:
authorization_header = f"Bearer {request.session['access_token']}"
if client_auth_cases_map["tenant_create"] and "parent_client_id" in request.data:
if request.data["parent_client_id"] == CUSTOS_SUPER_CLIENT_ID:
authorization_header = None
else:
authorization_header = get_client_auth_base64(request, client_id=request.data["parent_client_id"])
headers = {
'Accept': '*/*',
'Content-Type': 'application/json',
'Authorization': authorization_header
}
url = f"{CUSTOS_API_URL}/{endpoint_path}?{request.GET.urlencode()}"
data = request.data
if client_auth_cases_map["token_refresh_token"] or client_auth_cases_map["logout"]:
data['refresh_token'] = request.session['refresh_token']
response = requests.request(
method=request.method,
url=url,
json=data,
headers=headers
)
response_json = response.json()
if client_auth_cases_map["token_password"] or client_auth_cases_map["token_authorization_code"] or \
client_auth_cases_map["token_refresh_token"]:
set_token_response_session(request, response)
return Response(data={}, status=response.status_code)
elif client_auth_cases_map["token_openid-configuration"]:
authorization_endpoint = response_json["authorization_endpoint"]
url = f"{authorization_endpoint}?response_type=code&client_id={CUSTOS_CLIENT_ID}&redirect_uri={CUSTOS_REDIRECT_URI}&scope=openid"
ciLogonInstitutionEntityId = False # TODO
if ciLogonInstitutionEntityId:
url = f"{url}&kc_idp_hint=oidc&idphint={ciLogonInstitutionEntityId}"
else:
url = f"{url}&kc_idp_hint=oidc"
return Response({"authorization_endpoint": url}, status=response.status_code)
elif client_auth_cases_map["logout"]:
remove_token_response_session(request)
return Response(data=response_json, status=response.status_code)
else:
return Response(data=response_json, status=response.status_code)