ad-joining/register-computer/gcp/auth.py (46 lines of code) (raw):
#
# Copyright 2019 Google LLC
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import google.oauth2.id_token
import google.auth.transport.requests
class AuthorizationException(Exception):
pass
class InvalidIssuerException(AuthorizationException):
pass
class InvalidTokenException(AuthorizationException):
pass
class IncompleteTokenException(AuthorizationException):
pass
class InvalidAuthorizationHeaderException(AuthorizationException):
pass
class AuthenticationInfo(object):
def __init__(self, claims):
self.__claims = claims
@staticmethod
def from_authorization_header(header, audience, require_google_claim=True):
prefix = "Bearer "
if not header.startswith(prefix):
raise InvalidAuthorizationHeaderException("Unrecognized Authorization header")
return AuthenticationInfo.from_idtoken(header[len(prefix):], audience, require_google_claim)
@staticmethod
def from_idtoken(idtoken, audience, require_google_claim=True):
try:
# Validate that the token...
# - comes from Google (iss)
# - is authentic (signature check)
# - is valid (iat, exp)
# - is intended for us (aud)
token_info = google.oauth2.id_token.verify_oauth2_token(
idtoken,
google.auth.transport.requests.Request(),
audience)
except ValueError as e:
raise InvalidTokenException(e)
if token_info["iss"] not in ["accounts.google.com", "https://accounts.google.com"]:
raise InvalidIssuerException("Wrong issuer: '%s'" % token_info["iss"])
if require_google_claim:
# Expect a "full" token issued for a VM instance as documented here:
# https://cloud.google.com/compute/docs/instances/verifying-instance-identity#token_format
if not "google" in token_info:
raise IncompleteTokenException("Missing extended claims in token")
return AuthenticationInfo(token_info)
def get_issuer(self):
return self.__claims["iss"]
def get_email(self):
return self.__claims["email"]
def get_project_id(self):
return self.__claims["google"]["compute_engine"]["project_id"]
def get_instance_name(self):
return self.__claims["google"]["compute_engine"]["instance_name"]
def get_zone(self):
return self.__claims["google"]["compute_engine"]["zone"]