pkg/helpers/k8s_utils.go (58 lines of code) (raw):

// Copyright (c) 2021, 2025, Oracle and/or its affiliates. // // Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ package helpers import ( corev1 "k8s.io/api/core/v1" "os" ) // This file defines useful utility functions related to K8s // IsAppRunningInsideK8s returns if the application is running inside a K8s pod or not func IsAppRunningInsideK8s() bool { k8sHost := os.Getenv("KUBERNETES_SERVICE_HOST") k8sPort := os.Getenv("KUBERNETES_SERVICE_PORT") if len(k8sHost) > 0 && len(k8sPort) > 0 { // Host and Port are set return true } return false } // GetServiceAddressAndPort returns the IP or the hostname through which the service is available func GetServiceAddressAndPort(service *corev1.Service) (string, int32) { var servicePort int32 if len(service.Spec.Ports) > 0 { servicePort = service.Spec.Ports[0].Port } switch service.Spec.Type { case corev1.ServiceTypeLoadBalancer: if !IsAppRunningInsideK8s() { // The Application is running outside the K8s Cluster. // The service should be accessible via load balancer if the provider supports it. ingressPoints := service.Status.LoadBalancer.Ingress if len(ingressPoints) != 0 { ingressPoint := ingressPoints[0] if ingressPoint.IP != "" { return ingressPoint.IP, servicePort } else { return ingressPoint.Hostname, servicePort } } else { // ingress points not available return "", servicePort } } // The application is running inside the K8s Cluster - return ClusterIP fallthrough case corev1.ServiceTypeNodePort, corev1.ServiceTypeClusterIP: // The service should be accessible via ClusterIP. clusterIP := service.Spec.ClusterIP if clusterIP == corev1.ClusterIPNone { // this is a headless service return "", servicePort } return clusterIP, servicePort default: panic("service type not supported by GetServiceAddress yet") } } // GetCurrentNamespace returns the current namespace value, // on failure to fetch current namespace value it returns error. func GetCurrentNamespace() (string, error) { // First, it checks the environment for value set with CURRENT_NAMESPACE environment variable var currentNamespaceEnvVariable = "CURRENT_NAMESPACE" currentNamespace, variableFound := os.LookupEnv(currentNamespaceEnvVariable) if variableFound { return currentNamespace, nil } // Default namespace to be used by containers are placed in, // "/var/run/secrets/kubernetes.io/serviceaccount/namespace" file in each container namespace, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") return string(namespace), err } func GetWatchNamespaceFromEnvironment() string { var watchNamespaceEnvVariable = "WATCH_NAMESPACE" namespace, _ := os.LookupEnv(watchNamespaceEnvVariable) return namespace }