google-cloud-jupyter-config/google/cloud/jupyter_config/config.py (94 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import datetime
import json
import subprocess
import sys
import tempfile
import threading
import cachetools
from google.cloud.jupyter_config.tokenrenewer import CommandTokenRenewer
def run_gcloud_subcommand(subcmd):
"""Run a specified gcloud sub-command and return its output.
The supplied subcommand is the full command line invocation, *except* for
the leading `gcloud` being omitted.
e.g. `info` instead of `gcloud info`.
We reuse the system stderr for the command so that any prompts from gcloud
will be displayed to the user.
"""
with tempfile.TemporaryFile() as t:
p = subprocess.run(
f"gcloud {subcmd}",
stdin=subprocess.DEVNULL,
stderr=sys.stderr,
stdout=t,
check=True,
encoding="UTF-8",
shell=True,
)
t.seek(0)
return t.read().decode("UTF-8").strip()
async def async_run_gcloud_subcommand(subcmd):
"""Run a specified gcloud sub-command and return its output.
The supplied subcommand is the full command line invocation, *except* for
the leading `gcloud` being omitted.
e.g. `info` instead of `gcloud info`.
We reuse the system stderr for the command so that any prompts from gcloud
will be displayed to the user.
"""
with tempfile.TemporaryFile() as t:
p = await asyncio.create_subprocess_shell(
f"gcloud {subcmd}",
stdin=subprocess.DEVNULL,
stderr=sys.stderr,
stdout=t,
)
await p.wait()
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, None, None, None)
t.seek(0)
return t.read().decode("UTF-8").strip()
@cachetools.cached(
cache=cachetools.TTLCache(maxsize=1024, ttl=(20 * 60)), lock=threading.Lock()
)
def cached_gcloud_subcommand(subcmd):
return run_gcloud_subcommand(subcmd)
def clear_gcloud_cache():
"""Clear the TTL cache used to cache gcloud subcommand results."""
cached_gcloud_subcommand.cache_clear()
def _get_config_field(config, field):
subconfig = config
for path_part in field.split("."):
if path_part:
subconfig = subconfig.get(path_part, {})
return subconfig
def get_gcloud_config(field):
"""Helper method that invokes the gcloud config helper.
Invoking gcloud commands is a very heavyweight process, so the config is
cached for up to 20 minutes.
The config is generated with a minimum credential expiry of 30 minutes, so
that we can ensure that the caller can use the cached credentials for at
least ~10 minutes even if the cache entry is about to expire.
Args:
field: A period-separated search path for the config value to return.
For example, 'configuration.properties.core.project'
Returns:
A JSON object whose type depends on the search path for the field within
the gcloud config.
For example, if the field is `configuration.properties.core.project`,
then the return value will be a string. In comparison, if the field
is `configuration.properties.core`, then the return value will be a
dictionary containing a field named `project` with a string value.
"""
subcommand = "config config-helper --min-expiry=30m --format=json"
cached_config_str = cached_gcloud_subcommand(subcommand)
cached_config = json.loads(cached_config_str)
return _get_config_field(cached_config, field)
async def async_get_gcloud_config(field):
"""Async helper method that invokes the gcloud config helper.
This is like `get_gcloud_config` but does not block on the underlying
gcloud invocation when there is a cache miss.
Args:
field: A period-separated search path for the config value to return.
For example, 'configuration.properties.core.project'
Returns:
An awaitable that resolves to a JSON object with a type depending on
the search path for the field within the gcloud config.
For example, if the field is `configuration.properties.core.project`,
then the JSON object will be a string. In comparison, if the field
is `configuration.properties.core`, then it will be a dictionary
containing a field named `project` with a string value.
"""
subcommand = "config config-helper --min-expiry=30m --format=json"
with cached_gcloud_subcommand.cache_lock:
if subcommand in cached_gcloud_subcommand.cache:
cached_config_str = cached_gcloud_subcommand.cache[subcommand]
cached_config = json.loads(cached_config_str)
return _get_config_field(cached_config, field)
out = await async_run_gcloud_subcommand(subcommand)
with cached_gcloud_subcommand.cache_lock:
cached_gcloud_subcommand.cache[subcommand] = out
config = json.loads(out)
return _get_config_field(config, field)
def gcp_account():
"""Helper method to get the project configured through gcloud"""
return get_gcloud_config("configuration.properties.core.account")
def gcp_credentials():
"""Helper method to get the project configured through gcloud"""
return get_gcloud_config("credential.access_token")
def gcp_project():
"""Helper method to get the project configured through gcloud"""
return get_gcloud_config("configuration.properties.core.project")
def gcp_project_number():
"""Helper method to get the project number for the project configured through gcloud"""
project = gcp_project()
return run_gcloud_subcommand(
f'projects describe {project} --format="value(projectNumber)"'
)
def gcp_region():
"""Helper method to get the project configured through gcloud"""
region = get_gcloud_config("configuration.properties.dataproc.region")
if not region:
region = get_gcloud_config("configuration.properties.compute.region")
return region
def gcp_kernel_gateway_url():
"""Helper method to return the kernel gateway URL for the configured project and region."""
project = gcp_project_number()
region = gcp_region()
return f"https://{project}-dot-{region}.kernels.googleusercontent.com"
def configure_gateway_client(c):
"""Helper method for configuring the given Config object to use the GCP kernel gateway."""
c.GatewayClient.url = gcp_kernel_gateway_url()
c.GatewayClient.gateway_token_renewer_class = CommandTokenRenewer
c.CommandTokenRenewer.token_command = (
'gcloud config config-helper --format="value(credential.access_token)"'
)
# Version 2.8.0 of the `jupyter_server` package requires the `auth_token`
# value to be set to a non-empty value or else it will never invoke the
# token renewer. To accommodate this, we set it to an invalid initial
# value that will be immediately replaced by the token renewer.
#
# See https://github.com/jupyter-server/jupyter_server/issues/1339 for more
# details and discussion.
c.GatewayClient.auth_token = "Initial, invalid value"
c.GatewayClient.auth_scheme = "Bearer"
c.GatewayClient.headers = '{"Cookie": "_xsrf=XSRF", "X-XSRFToken": "XSRF"}'