airavata_custos_portal/apps/api/views.py (149 lines of code) (raw):
from django.conf import settings
from django.http import HttpResponse
from rest_framework.decorators import api_view
from rest_framework.response import Response
from django.shortcuts import redirect
import requests
import base64
import jwt
CUSTOS_CLIENT_ID = settings.CUSTOS_CLIENT_ID
CUSTOS_CLIENT_SEC = settings.CUSTOS_CLIENT_SEC
CUSTOS_API_URL = settings.CUSTOS_API_URL
CUSTOS_SUPER_CLIENT_ID = settings.CUSTOS_SUPER_CLIENT_ID
UNDER_MAINTENANCE = settings.UNDER_MAINTENANCE
ENDPOINTS = {
"IDENTITY": "identity-management/v1.0.0",
"USERS": "user-management/v1.0.0",
"GROUPS": "group-management/v1.0.0",
"TENANTS": "tenant-management/v1.0.0",
"SHARING": "sharing-management/v1.0.0",
"SECRETS": "resource-secret-management/v1.0.0"
}
# Create your views here.
@api_view()
def get_config(request):
# Just a simple REST API view to show how to access VUE_APP_* settings
return Response({
"VUE_APP_CLIENT_ID": CUSTOS_CLIENT_ID,
"VUE_APP_CUSTOS_API_URL": request.build_absolute_uri('/api/custos'),
"VUE_APP_SUPER_CLIENT_ID": CUSTOS_SUPER_CLIENT_ID,
"VUE_APP_UNDER_MAINTENANCE": 0
})
@api_view()
def get_userinfo(request):
if 'access_token' not in request.session:
return HttpResponse('Unauthorized', status=401)
else:
payload = jwt.decode(jwt=request.session['access_token'], verify=False)
return Response(payload)
def get_client_sec(request, client_id):
response = requests.get(
url=f"{CUSTOS_API_URL}/{ENDPOINTS['IDENTITY']}/credentials",
params={'client_id': client_id},
headers={
'Accept': '*/*',
'Content-Type': 'application/json',
'Authorization': f"Bearer {request.session['access_token']}"
}
)
response_json = response.json()
client_sec = response_json["custos_client_secret"]
return client_sec
def get_client_auth_base64(request, client_id=None, client_sec=None):
if client_id is None and client_sec is None:
client_id = CUSTOS_CLIENT_ID
client_sec = CUSTOS_CLIENT_SEC
elif client_id is not None and client_sec is None:
client_sec = get_client_sec(request, client_id)
client_auth_base64 = f"{client_id}:{client_sec}"
client_auth_base64 = client_auth_base64.encode("utf-8")
client_auth_base64 = base64.b64encode(client_auth_base64).decode('utf-8')
client_auth_base64 = f"Bearer {client_auth_base64}"
return client_auth_base64
@api_view()
def get_auth_callback(request):
CUSTOS_REDIRECT_URI = request.build_absolute_uri('/api/callback')
code = request.GET.get("code", None)
client_auth_base64 = get_client_auth_base64(request)
response = requests.post(
url=f"{CUSTOS_API_URL}/{ENDPOINTS['IDENTITY']}/token",
json={'code': code, 'redirect_uri': CUSTOS_REDIRECT_URI,
'grant_type': 'authorization_code'},
headers={
'Accept': '*/*',
'Content-Type': 'application/json',
'Authorization': client_auth_base64
}
)
set_token_response_session(request, response)
return redirect("/")
def set_token_response_session(request, response):
response_json = response.json()
request.session.set_expiry(response_json["expires_in"])
request.session['access_token'] = response_json["access_token"]
request.session['refresh_token'] = response_json["refresh_token"]
request.session['id_token'] = response_json["id_token"]
def remove_token_response_session(request):
del request.session['access_token']
del request.session['refresh_token']
del request.session['id_token']
custos_resource_map = {
"credentials": f"{ENDPOINTS['IDENTITY']}/credentials"
}
@api_view(["GET", "POST", "PUT", "DELETE"])
def get_custos_api(request, endpoint_path=""):
CUSTOS_REDIRECT_URI = request.build_absolute_uri('/api/callback')
client_auth_base64 = get_client_auth_base64(request)
client_auth_cases_map = {
"token_password": endpoint_path == f"{ENDPOINTS['IDENTITY']}/token" and "grant_type" in request.data
and request.data["grant_type"] == "password",
"token_refresh_token": endpoint_path == f"{ENDPOINTS['IDENTITY']}/token" and "grant_type" in request.data
and request.data["grant_type"] == "refresh_token",
"token_authorization_code": endpoint_path == f"{ENDPOINTS['IDENTITY']}/token" and "grant_type" in request.data
and request.data["grant_type"] == "authorization_code",
"token_openid-configuration": endpoint_path == f"{ENDPOINTS['IDENTITY']}/.well-known/openid-configuration",
"logout": endpoint_path == f"{ENDPOINTS['IDENTITY']}/user/logout",
"tenant_create": endpoint_path == f"{ENDPOINTS['TENANTS']}/oauth2/tenant" and request.method == "POST"
}
authorization_header = None
if True in client_auth_cases_map.values():
authorization_header = client_auth_base64
elif "access_token" in request.session:
authorization_header = f"Bearer {request.session['access_token']}"
if client_auth_cases_map["tenant_create"] and "parent_client_id" in request.data:
if request.data["parent_client_id"] == CUSTOS_SUPER_CLIENT_ID:
authorization_header = None
else:
authorization_header = get_client_auth_base64(request, client_id=request.data["parent_client_id"])
headers = {
'Accept': '*/*',
'Content-Type': 'application/json',
'Authorization': authorization_header
}
url = f"{CUSTOS_API_URL}/{endpoint_path}?{request.GET.urlencode()}"
data = request.data
if client_auth_cases_map["token_refresh_token"] or client_auth_cases_map["logout"]:
data['refresh_token'] = request.session['refresh_token']
response = requests.request(
method=request.method,
url=url,
json=data,
headers=headers
)
response_json = response.json()
if client_auth_cases_map["token_password"] or client_auth_cases_map["token_authorization_code"] or \
client_auth_cases_map["token_refresh_token"]:
set_token_response_session(request, response)
return Response(data={}, status=response.status_code)
elif client_auth_cases_map["token_openid-configuration"]:
authorization_endpoint = response_json["authorization_endpoint"]
url = f"{authorization_endpoint}?response_type=code&client_id={CUSTOS_CLIENT_ID}&redirect_uri={CUSTOS_REDIRECT_URI}&scope=openid"
ciLogonInstitutionEntityId = False # TODO
if ciLogonInstitutionEntityId:
url = f"{url}&kc_idp_hint=oidc&idphint={ciLogonInstitutionEntityId}"
else:
url = f"{url}&kc_idp_hint=oidc"
return Response({"authorization_endpoint": url}, status=response.status_code)
elif client_auth_cases_map["logout"]:
remove_token_response_session(request)
return Response(data=response_json, status=response.status_code)
else:
return Response(data=response_json, status=response.status_code)