in django_airavata/apps/auth/backends.py [0:0]
def _store_idp_userinfo(self, user, token, idp_alias):
try:
idp_token_url = None
userinfo_url = None
for auth_option in settings.AUTHENTICATION_OPTIONS['external']:
if auth_option['idp_alias'] == idp_alias:
idp_token_url = auth_option.get('idp_token_url')
userinfo_url = auth_option.get('userinfo_url')
break
if idp_token_url is None or userinfo_url is None:
logger.debug(f"idp_token_url and/or userinfo_url not set for {idp_alias} "
"in AUTHENTICATION_OPTIONS, skipping retrieval of external IDP userinfo")
return
access_token = token['access_token']
logger.debug(f"access_token={access_token} for idp_alias={idp_alias}")
# fetch the idp's token
headers = {'Authorization': f'Bearer {access_token}'}
# For the following to work, in Keycloak the IDP should have 'Store
# Tokens' and 'Stored Tokens Readable' enabled and the user needs
# the broker/read-token role
r = requests.get(idp_token_url, headers=headers)
idp_token = r.json()
idp_headers = {'Authorization': f"Bearer {idp_token['access_token']}"}
r = requests.get(userinfo_url, headers=idp_headers)
userinfo = r.json()
logger.debug(f"userinfo={userinfo}")
# Save the idp user info claims
user_profile = user.user_profile
for (claim, value) in userinfo.items():
if user_profile.idp_userinfo.filter(idp_alias=idp_alias, claim=claim).exists():
userinfo_claim = user_profile.idp_userinfo.get(idp_alias=idp_alias, claim=claim)
userinfo_claim.value = value
userinfo_claim.save()
else:
user_profile.idp_userinfo.create(idp_alias=idp_alias, claim=claim, value=value)
except Exception:
logger.exception(f"Failed to store IDP userinfo for {user.username} from IDP {idp_alias}")