kubernetes/util.go (269 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kubernetes
import (
"context"
"errors"
"fmt"
"os"
"strings"
"k8s.io/client-go/metadata"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/safemapstr"
)
type HostDiscoveryUtils interface {
GetNamespace() (string, error)
GetPodName() (string, error)
GetMachineID() string
}
// DiscoverKubernetesNodeParams includes parameters for discovering kubernetes node
type DiscoverKubernetesNodeParams struct {
ConfigHost string
Client kubernetes.Interface
IsInCluster bool
HostUtils HostDiscoveryUtils
}
// DefaultDiscoveryUtils implements functions of HostDiscoveryUtils interface
type DefaultDiscoveryUtils struct{}
func GetKubeConfigEnvironmentVariable() string {
envKubeConfig := os.Getenv("KUBECONFIG")
if _, err := os.Stat(envKubeConfig); !os.IsNotExist(err) {
return envKubeConfig
}
return ""
}
// GetKubernetesClient returns a kubernetes client. If inCluster is true, it returns an
// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed,
// it parses the config file to get the config required to build a client.
func GetKubernetesClient(kubeconfig string, opt KubeClientOptions) (kubernetes.Interface, error) {
if kubeconfig == "" {
kubeconfig = GetKubeConfigEnvironmentVariable()
}
cfg, err := BuildConfig(kubeconfig)
if err != nil {
return nil, fmt.Errorf("unable to build kube config due to error: %w", err)
}
cfg.QPS = opt.QPS
cfg.Burst = opt.Burst
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("unable to build kubernetes clientset: %w", err)
}
return client, nil
}
// GetKubernetesMetadataClient returns a kubernetes metadata-only client. If inCluster is true, it returns an
// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed,
// it parses the config file to get the config required to build a client.
func GetKubernetesMetadataClient(kubeconfig string, opt KubeClientOptions) (metadata.Interface, error) {
if kubeconfig == "" {
kubeconfig = GetKubeConfigEnvironmentVariable()
}
cfg, err := BuildConfig(kubeconfig)
if err != nil {
return nil, fmt.Errorf("unable to build kube config due to error: %w", err)
}
cfg.QPS = opt.QPS
cfg.Burst = opt.Burst
client, err := metadata.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("unable to build kubernetes clientset: %w", err)
}
return client, nil
}
// BuildConfig is a helper function that builds configs from a kubeconfig filepath.
// If kubeconfigPath is not passed in we fallback to inClusterConfig.
// If inClusterConfig fails, we fallback to the default config.
// This is a copy of `clientcmd.BuildConfigFromFlags` of `client-go` but without the annoying
// klog messages that are not possible to be disabled.
func BuildConfig(kubeconfigPath string) (*restclient.Config, error) {
if kubeconfigPath == "" {
kubeconfig, err := restclient.InClusterConfig()
if err == nil {
return kubeconfig, nil
}
}
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: ""}}).ClientConfig()
}
// IsInCluster takes a kubeconfig file path as input and deduces if Beats is running in cluster or not,
// taking into consideration the existence of KUBECONFIG variable
func IsInCluster(kubeconfig string) bool {
if kubeconfig != "" || GetKubeConfigEnvironmentVariable() != "" {
return false
}
return true
}
// DiscoverKubernetesNode figures out the Kubernetes node to use.
// If host is provided in the config use it directly.
// If it is empty then try
// 1. If beat is deployed in k8s cluster, use hostname of pod as the pod name to query pod metadata for node name.
// 2. If step 1 fails or beat is deployed outside k8s cluster, use NODE_NAME env var.
// 3. If node cannot be discovered with step 1,2, use machine-id to match against k8s nodes for node name. In case it is not set return error.
// Note: There have been cases where machine-id reported by compute instances of some cloud providers where k8s nodes run on, has the wrong value.
func DiscoverKubernetesNode(log *logp.Logger, nd *DiscoverKubernetesNodeParams) (string, error) {
ctx := context.TODO()
// Discover node by configuration file (NODE) if set
if nd.ConfigHost != "" {
log.Infof("kubernetes: Using node %s provided in the config", nd.ConfigHost)
return nd.ConfigHost, nil
}
// Discover node by serviceaccount namespace and pod's hostname in case Beats is running in cluster
if nd.IsInCluster {
node, err := discoverInCluster(nd, ctx)
if err == nil {
log.Infof("kubernetes: Node %s discovered by in cluster pod node query", node)
return node, nil
}
log.Debug(err)
}
// use environment variable NODE_NAME
node := os.Getenv("NODE_NAME")
if node != "" {
log.Infof("kubernetes: Node %s discovered by NODE_NAME environment variable", node)
return node, nil
}
log.Debug(errors.New("NODE_NAME environment variable is not set"))
// try discover node by machine id
node, err := discoverByMachineID(nd, ctx)
if err == nil {
log.Infof("kubernetes: Node %s discovered by machine-id matching", node)
return node, nil
}
log.Debug(err)
return "", errors.New("kubernetes: Node could not be discovered with any known method. Consider setting env var NODE_NAME")
}
func discoverInCluster(nd *DiscoverKubernetesNodeParams, ctx context.Context) (node string, errorMsg error) {
ns, err := nd.HostUtils.GetNamespace()
if err != nil {
errorMsg = fmt.Errorf("kubernetes: Couldn't get namespace when beat is in cluster with error: %w", err)
return
}
podName, err := nd.HostUtils.GetPodName()
if err != nil {
errorMsg = fmt.Errorf("kubernetes: Couldn't get hostname as beat pod name in cluster with error: %w", err)
return
}
pod, err := nd.Client.CoreV1().Pods(ns).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
errorMsg = fmt.Errorf("kubernetes: Querying for pod failed with error: %w", err)
return
}
return pod.Spec.NodeName, nil
}
func discoverByMachineID(nd *DiscoverKubernetesNodeParams, ctx context.Context) (nodeName string, errorMsg error) {
mid := nd.HostUtils.GetMachineID()
if mid == "" {
errorMsg = errors.New("kubernetes: Couldn't collect info from any of the files in /etc/machine-id /var/lib/dbus/machine-id")
return
}
nodes, err := nd.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
errorMsg = fmt.Errorf("kubernetes: Querying for nodes failed with error: %w", err)
return
}
for _, n := range nodes.Items {
if n.Status.NodeInfo.MachineID == mid {
nodeName = n.GetObjectMeta().GetName()
return nodeName, nil
}
}
errorMsg = fmt.Errorf("kubernetes: Couldn't discover node %s", mid)
return nodeName, errorMsg
}
// GetMachineID returns the machine-idadd_kubernetes_metadata/indexers_test.go
// borrowed from machineID of cadvisor.
func (hd *DefaultDiscoveryUtils) GetMachineID() string {
for _, file := range []string{
"/etc/machine-id",
"/var/lib/dbus/machine-id",
} {
id, err := os.ReadFile(file)
if err == nil {
return strings.TrimSpace(string(id))
}
}
return ""
}
// GetNamespace gets namespace from serviceaccount when beat is in cluster.
func (hd *DefaultDiscoveryUtils) GetNamespace() (string, error) {
return InClusterNamespace()
}
// GetPodName returns the hostname of the pod
func (hd *DefaultDiscoveryUtils) GetPodName() (string, error) {
return os.Hostname()
}
// InClusterNamespace gets namespace from serviceaccount when beat is in cluster. // code borrowed from client-go with some changes.
func InClusterNamespace() (string, error) {
// get namespace associated with the service account token, if available
data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return "", err
}
return strings.TrimSpace(string(data)), nil
}
type ContainerInPod struct {
ID string
Runtime string
Spec Container
Status PodContainerStatus
}
// GetContainersInPod returns all the containers defined in a pod and their statuses.
// It includes init and ephemeral containers.
func GetContainersInPod(pod *Pod) []*ContainerInPod {
containers := make([]*ContainerInPod, len(pod.Spec.Containers)+len(pod.Spec.InitContainers)+len(pod.Spec.EphemeralContainers))
idx := 0
for _, c := range pod.Spec.Containers {
containers[idx] = &ContainerInPod{Spec: c}
idx++
}
for _, c := range pod.Spec.InitContainers {
containers[idx] = &ContainerInPod{Spec: c}
idx++
}
for _, c := range pod.Spec.EphemeralContainers {
c := Container(c.EphemeralContainerCommon)
containers[idx] = &ContainerInPod{Spec: c}
idx++
}
statuses := make(map[string]*PodContainerStatus)
mapStatuses := func(s []PodContainerStatus) {
for i := range s {
statuses[s[i].Name] = &s[i]
}
}
mapStatuses(pod.Status.ContainerStatuses)
mapStatuses(pod.Status.InitContainerStatuses)
mapStatuses(pod.Status.EphemeralContainerStatuses)
for _, c := range containers {
if s, ok := statuses[c.Spec.Name]; ok {
c.ID, c.Runtime = ContainerIDWithRuntime(*s)
c.Status = *s
}
}
return containers
}
// PodLabels returns the labels in a pod
func PodLabels(pod *Pod) mapstr.M {
labels := mapstr.M{}
for k, v := range pod.GetObjectMeta().GetLabels() {
_ = safemapstr.Put(labels, k, v)
}
return labels
}
// PodAnnotations returns the annotations in a pod
func PodAnnotations(pod *Pod) mapstr.M {
annotations := mapstr.M{}
for k, v := range pod.GetObjectMeta().GetAnnotations() {
_ = safemapstr.Put(annotations, k, v)
}
return annotations
}
// PodNamespaceAnnotations returns the annotations of the namespace of the pod
func PodNamespaceAnnotations(pod *Pod, watcher Watcher) mapstr.M {
if watcher == nil {
return nil
}
rawNs, ok, err := watcher.Store().GetByKey(pod.Namespace)
if !ok || err != nil {
return nil
}
namespace, ok := rawNs.(*Namespace)
if !ok {
return nil
}
annotations := mapstr.M{}
for k, v := range namespace.GetAnnotations() {
_ = safemapstr.Put(annotations, k, v)
}
return annotations
}
// PodTerminating returns true if a pod is marked for deletion or is in a phase beyond running.
func PodTerminating(pod *Pod) bool {
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
return true
}
switch pod.Status.Phase {
case PodRunning, PodPending:
default:
return true
}
return false
}
// PodTerminated returns true if a pod is terminated, this method considers a
// pod as terminated if none of its containers are running (or going to be running).
func PodTerminated(pod *Pod, containers []*ContainerInPod) bool {
// Pod is not marked for termination, so it is not terminated.
if !PodTerminating(pod) {
return false
}
// If any container is running, the pod is not terminated yet.
for _, container := range containers {
if container.Status.State.Running != nil {
return false
}
}
return true
}