in jetstream/argo.py [0:0]
def _get_config(self) -> Configuration:
"""Get the Kubernetes cluster config."""
if not self.cluster_ip and not self.cluster_cert:
cluster_manager_client = ClusterManagerClient()
cluster = cluster_manager_client.get_cluster(
name=f"projects/{self.project_id}/locations/{self.zone}/clusters/{self.cluster_id}"
)
self.cluster_ip = cluster.endpoint
self.cluster_cert = str(cluster.master_auth.cluster_ca_certificate)
elif not (self.cluster_ip and self.cluster_cert):
raise Exception(
"Cluster IP and cluster certificate required when cluster configuration "
"is provided explicitly."
)
creds, projects = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req) # refresh token
with NamedTemporaryFile(delete=False) as ca_cert:
ca_cert.write(base64.b64decode(self.cluster_cert))
return Configuration(
host=f"https://{self.cluster_ip}",
ssl_ca_cert=ca_cert.name,
authorization_key_prefix="Bearer",
authorization_key=creds.token, # valid for one hour
)