mysqloperator/controller/kubeutils.py (74 lines of code) (raw):

# Copyright (c) 2020, 2022, Oracle and/or its affiliates. # # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ # import os import socket import sys import time from logging import Logger from typing import Callable, Optional, TypeVar from kubernetes.client.rest import ApiException from kubernetes import client, config try: # outside k8s config.load_kube_config() except config.config_exception.ConfigException: try: # inside a k8s pod config.load_incluster_config() except config.config_exception.ConfigException: raise Exception( "Could not configure kubernetes python client") api_core: client.CoreV1Api = client.CoreV1Api() api_customobj: client.CustomObjectsApi = client.CustomObjectsApi() api_apps: client.AppsV1Api = client.AppsV1Api() api_batch: client.BatchV1Api = client.BatchV1Api() api_cron_job: client.BatchV1Api = client.BatchV1Api() api_policy: client.PolicyV1Api = client.PolicyV1Api() api_rbac: client.RbacAuthorizationV1Api = client.RbacAuthorizationV1Api() api_client: client.ApiClient = client.ApiClient() api_apis: client.ApisApi() = client.ApisApi() T = TypeVar("T") def catch_404(f: Callable[..., T]) -> Optional[T]: try: return f() except ApiException as e: if e.status == 404: return None raise def available_apis(): return api_apis.get_api_versions() def k8s_version() -> str: api_instance = client.VersionApi(api_client) api_response = api_instance.get_code() return f"{api_response.major}.{api_response.minor}" _k8s_cluster_domain = None def k8s_cluster_domain(logger: Optional[Logger], ns="kube-system") -> str: """Get the Kubernetes Cluster's Domain. Can be overwritten using environment MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN. If this fails to detect it will retry in a blocking loop. This should only happen in operator_main before startup. If it constantly fails the process will be terminated. """ global _k8s_cluster_domain # We use the cached value instead of querying multiple times if _k8s_cluster_domain: return _k8s_cluster_domain # The user could override the lookup using env _k8s_cluster_domain = os.getenv("MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN") if _k8s_cluster_domain: if logger: logger.info(f"Environment provided cluster domain: {_k8s_cluster_domain}") return _k8s_cluster_domain for _ in range(15): try: # Try reverse lookup via some service having a cluster_ip set. Operator # is allowed to list all services and we assume some service is in # kube-system namespace. ip = next( filter( lambda ip: ip, map( lambda service: service.spec.cluster_ip, api_core.list_namespaced_service(ns).items ) ) ) if ip: fqdn = socket.gethostbyaddr(ip)[0] [_, _, _, _k8s_cluster_domain] = fqdn.split('.', maxsplit=3) if logger: logger.info(f"Auto-detected cluster domain: {_k8s_cluster_domain}") return _k8s_cluster_domain except Exception as e: if logger: logger.warning("Failed to detect cluster domain. " f"Reason: {e}") time.sleep(2) logger.error( """Failed to automatically identify the cluster domain. If this persists try setting MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN via environment.""" ) sys.exit(1)