in providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py [0:0]
def get_oauth_user_info(self, provider: str, resp: dict[str, Any]) -> dict[str, Any]:
"""
There are different OAuth APIs with different ways to retrieve user info.
All providers have different ways to retrieve user info.
"""
# for GITHUB
if provider == "github" or provider == "githublocal":
me = self.oauth_remotes[provider].get("user")
data = me.json()
log.debug("User info from GitHub: %s", data)
return {"username": "github_" + data.get("login")}
# for twitter
if provider == "twitter":
me = self.oauth_remotes[provider].get("account/settings.json")
data = me.json()
log.debug("User info from Twitter: %s", data)
return {"username": "twitter_" + data.get("screen_name", "")}
# for linkedin
if provider == "linkedin":
me = self.oauth_remotes[provider].get(
"people/~:(id,email-address,first-name,last-name)?format=json"
)
data = me.json()
log.debug("User info from LinkedIn: %s", data)
return {
"username": "linkedin_" + data.get("id", ""),
"email": data.get("email-address", ""),
"first_name": data.get("firstName", ""),
"last_name": data.get("lastName", ""),
}
# for Google
if provider == "google":
me = self.oauth_remotes[provider].get("userinfo")
data = me.json()
log.debug("User info from Google: %s", data)
return {
"username": "google_" + data.get("id", ""),
"first_name": data.get("given_name", ""),
"last_name": data.get("family_name", ""),
"email": data.get("email", ""),
}
if provider == "azure":
me = self._decode_and_validate_azure_jwt(resp["id_token"])
log.debug("User info from Azure: %s", me)
# https://learn.microsoft.com/en-us/azure/active-directory/develop/id-token-claims-reference#payload-claims
return {
"email": me["email"] if "email" in me else me["upn"],
"first_name": me.get("given_name", ""),
"last_name": me.get("family_name", ""),
"username": me["oid"],
"role_keys": me.get("roles", []),
}
# for OpenShift
if provider == "openshift":
me = self.oauth_remotes[provider].get("apis/user.openshift.io/v1/users/~")
data = me.json()
log.debug("User info from OpenShift: %s", data)
return {"username": "openshift_" + data.get("metadata").get("name")}
# for Okta
if provider == "okta":
me = self.oauth_remotes[provider].get("userinfo")
data = me.json()
log.debug("User info from Okta: %s", data)
if "error" not in data:
return {
"username": f"{provider}_{data['sub']}",
"first_name": data.get("given_name", ""),
"last_name": data.get("family_name", ""),
"email": data["email"],
"role_keys": data.get("groups", []),
}
log.error(data.get("error_description"))
return {}
# for Auth0
if provider == "auth0":
data = self.appbuilder.sm.oauth_remotes[provider].userinfo()
log.debug("User info from Auth0: %s", data)
return {
"username": f"{provider}_{data['sub']}",
"first_name": data.get("given_name", ""),
"last_name": data.get("family_name", ""),
"email": data["email"],
"role_keys": data.get("groups", []),
}
# for Keycloak
if provider in ["keycloak", "keycloak_before_17"]:
me = self.oauth_remotes[provider].get("openid-connect/userinfo")
me.raise_for_status()
data = me.json()
log.debug("User info from Keycloak: %s", data)
return {
"username": data.get("preferred_username", ""),
"first_name": data.get("given_name", ""),
"last_name": data.get("family_name", ""),
"email": data.get("email", ""),
"role_keys": data.get("groups", []),
}
# for Authentik
if provider == "authentik":
id_token = resp["id_token"]
me = self._get_authentik_token_info(id_token)
log.debug("User info from authentik: %s", me)
return {
"email": me["preferred_username"],
"first_name": me.get("given_name", ""),
"username": me["nickname"],
"role_keys": me.get("groups", []),
}
return {}