in src/dispatch/plugins/dispatch_core/plugin.py [0:0]
def get_current_user(self, request: Request, **kwargs):
credentials_exception = HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail=[{"msg": "Could not validate credentials"}]
)
authorization: str = request.headers.get(
"Authorization", request.headers.get("authorization")
)
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
raise credentials_exception
token = authorization.split()[1]
# Parse out the Key information. Add padding just in case
key_info = json.loads(base64.b64decode(token.split(".")[0] + "=========").decode("utf-8"))
# Grab all possible keys to account for key rotation and find the right key
keys = requests.get(DISPATCH_AUTHENTICATION_PROVIDER_PKCE_JWKS).json()["keys"]
for potential_key in keys:
if potential_key["kid"] == key_info["kid"]:
key = potential_key
try:
jwt_opts = {}
if DISPATCH_PKCE_DONT_VERIFY_AT_HASH:
jwt_opts = {"verify_at_hash": False}
# If DISPATCH_JWT_AUDIENCE is defined, the we must include audience in the decode
if DISPATCH_JWT_AUDIENCE:
data = jwt.decode(token, key, audience=DISPATCH_JWT_AUDIENCE, options=jwt_opts)
else:
data = jwt.decode(token, key, options=jwt_opts)
except JWTError as err:
log.debug("JWT Decode error: {}".format(err))
raise credentials_exception from err
# Support overriding where email is returned in the id token
if DISPATCH_JWT_EMAIL_OVERRIDE:
return data[DISPATCH_JWT_EMAIL_OVERRIDE]
else:
return data["email"]