modules/jupyter/authentication/authenticator/gcpiapjwtauthenticator/gcpiapjwtauthenticator.py (97 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from jupyterhub.handlers import BaseHandler
from jupyterhub.auth import Authenticator
from jupyterhub.utils import url_path_join
import requests
import logging
from tornado import web
from traitlets import Unicode
from urllib import parse
from google.auth.transport import requests
from google.oauth2 import id_token
from googleapiclient import discovery
import google.auth
def list_backend_services_ids(project_id, keyword):
credentials, _ = google.auth.default()
service = discovery.build('compute', 'v1', credentials=credentials)
request = service.backendServices().list(project=project_id)
response = request.execute()
filtered_service_ids = [
service['id'] for service in response.get('items', [])
if keyword.lower() in service['name'].lower()
]
return filtered_service_ids
class IAPUserLoginHandler(BaseHandler):
def get(self):
header_name = self.authenticator.header_name
auth_header_content = self.request.headers.get(header_name, "") if header_name else None
# Extract project ID, namespace, and backend config name from the authenticator
project_id = self.authenticator.project_id
namespace = self.authenticator.namespace
service_name = self.authenticator.service_name
print("Project ID:", project_id)
print("Namespace:", namespace)
print("Backend Config Name:", service_name)
# Construct the keyword from namespace and backend config name
keyword = namespace + "-" + service_name
print("Keyword:", keyword)
# List GCP backend services IDs based on the project ID and keyword
gcp_backend_services_ids = list_backend_services_ids(project_id, keyword)
print("GCP Backend Services IDs:", gcp_backend_services_ids)
# Construct expected audiences from the GCP backend services IDs
expected_audiences = [f"/projects/{self.authenticator.project_number}/global/backendServices/{service_id}" for service_id in gcp_backend_services_ids]
print("Expected Audiences:", expected_audiences)
if self.authenticator.header_name != "X-Goog-IAP-JWT-Assertion":
raise web.HTTPError(400, 'X-Goog-IAP-JWT-Assertion is the only accepted Header')
elif bool(auth_header_content) == 0:
raise web.HTTPError(400, 'Can not verify the IAP authentication content.')
else:
_, user_email, err = validate_iap_jwt(
auth_header_content,
expected_audiences
)
if err:
raise Exception(f'Ran into error: {err}')
else:
logging.info(f'Successfully validated!')
username = user_email.lower().split("@")[0]
user = self.user_from_username(username)
self.set_login_cookie(user)
self.redirect(url_path_join(self.hub.server.base_url, 'home'))
class GCPIAPAuthenticator(Authenticator):
"""
Accept the authenticated JSON Web Token from IAP Login.
Used by the JupyterHub as the Authentication class
The get_handlers is how JupyterHub know how to handle auth
"""
header_name = Unicode(
config=True,
help="""HTTP/HTTPS header to inspect for the authenticated JWT.""")
cookie_name = Unicode(
config=True,
help="""The name of the cookie field used to specify the JWT token""")
param_name = Unicode(
config=True,
help="""The name of the query parameter used to specify the JWT token""")
project_id = Unicode(
default_value='',
config=True,
help="""Expected project_id""")
project_number = Unicode(
default_value='',
config=True,
help="""Expected project_number""")
namespace = Unicode(
default_value='',
config=True,
help="""Expected namespace""")
service_name = Unicode(
default_value='',
config=True,
help="""Expected backend_config_name""")
secret = Unicode(
config=True,
help="""Shared secret key for signing JWT token""")
def get_handlers(self, app):
return [(r'login', IAPUserLoginHandler)]
def validate_iap_jwt(iap_jwt, expected_audiences):
"""Validate an IAP JWT.
Args:
iap_jwt: The contents of the X-Goog-IAP-JWT-Assertion header.
expected_audiences: The Signed Header JWT audiences. See
https://cloud.google.com/iap/docs/signed-headers-howto
for details on how to get this value.
Returns:
(user_id, user_email, error_str).
"""
try:
decoded_jwt = id_token.verify_token(
iap_jwt,
requests.Request(),
audience=expected_audiences,
certs_url="https://www.gstatic.com/iap/verify/public_key",
)
return (decoded_jwt["sub"], decoded_jwt["email"], "")
except Exception as e:
return (None, None, f"JWT validation error {e}")