libbeat/autodiscover/providers/kubernetes/pod.go (378 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build !aix
package kubernetes
import (
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/gofrs/uuid/v5"
k8s "k8s.io/client-go/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/bus"
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata"
"github.com/elastic/elastic-agent-autodiscover/utils"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
type pod struct {
uuid uuid.UUID
config *Config
metagen metadata.MetaGen
logger *logp.Logger
publishFunc func([]bus.Event)
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
replicasetWatcher kubernetes.Watcher
jobWatcher kubernetes.Watcher
// Mutex used by configuration updates not triggered by the main watcher,
// to avoid race conditions between cross updates and deletions.
// Other updaters must use a write lock.
crossUpdate sync.RWMutex
}
// NewPodEventer creates an eventer that can discover and process pod objects
func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) {
logger := logp.NewLogger("autodiscover.pod")
var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher
config := defaultConfig()
err := cfg.Unpack(&config)
if err != nil {
return nil, err
}
// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
if config.Scope == "node" {
nd := &kubernetes.DiscoverKubernetesNodeParams{
ConfigHost: config.Node,
Client: client,
IsInCluster: kubernetes.IsInCluster(config.KubeConfig),
HostUtils: &kubernetes.DefaultDiscoveryUtils{},
}
config.Node, err = kubernetes.DiscoverKubernetesNode(logger, nd)
if err != nil {
return nil, fmt.Errorf("couldn't discover kubernetes node due to error %w", err)
}
} else {
config.Node = ""
}
logger.Debugf("Initializing a new Kubernetes watcher using node: %v", config.Node)
watcher, err := kubernetes.NewNamedWatcher("pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Pod{}, err)
}
metaConf := config.AddResourceMetadata
// We initialise the use_kubeadm variable based on modules KubeAdm base configuration
err = metaConf.Namespace.SetBool("use_kubeadm", -1, config.KubeAdm)
if err != nil {
logger.Errorf("couldn't set kubeadm variable for namespace due to error %+v", err)
}
err = metaConf.Node.SetBool("use_kubeadm", -1, config.KubeAdm)
if err != nil {
logger.Errorf("couldn't set kubeadm variable for node due to error %+v", err)
}
if metaConf.Node.Enabled() || config.Hints.Enabled() {
options := kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
HonorReSyncs: true,
}
nodeWatcher, err = kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil)
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}
}
if metaConf.Namespace.Enabled() || config.Hints.Enabled() {
namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
}
// Resource is Pod, so we need to create watchers for Replicasets and Jobs that it might belong to
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
logger.Errorf("Error creating metadata client due to error %+v", err)
}
replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
},
nil,
metadata.RemoveUnnecessaryReplicaSetData,
)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
}
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
}
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)
p := &pod{
config: config,
uuid: uuid,
publishFunc: publish,
metagen: metaGen,
logger: logger,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
replicasetWatcher: replicaSetWatcher,
jobWatcher: jobWatcher,
}
watcher.AddEventHandler(p)
if nodeWatcher != nil && (config.Hints.Enabled() || metaConf.Node.Enabled()) {
updater := kubernetes.NewNodePodUpdater(p.unlockedUpdate, watcher.Store(), p.nodeWatcher, &p.crossUpdate)
nodeWatcher.AddEventHandler(updater)
}
if namespaceWatcher != nil && (config.Hints.Enabled() || metaConf.Namespace.Enabled()) {
updater := kubernetes.NewNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), p.namespaceWatcher, &p.crossUpdate)
namespaceWatcher.AddEventHandler(updater)
}
return p, nil
}
// OnAdd ensures processing of pod objects that are newly added.
func (p *pod) OnAdd(obj interface{}) {
p.crossUpdate.RLock()
defer p.crossUpdate.RUnlock()
p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
}
// OnUpdate handles events for pods that have been updated.
func (p *pod) OnUpdate(obj interface{}) {
p.crossUpdate.RLock()
defer p.crossUpdate.RUnlock()
p.unlockedUpdate(obj)
}
func (p *pod) unlockedUpdate(obj interface{}) {
p.logger.Debugf("Watcher Pod update: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "stop")
p.emit(obj.(*kubernetes.Pod), "start")
}
// OnDelete stops pod objects that are deleted.
func (p *pod) OnDelete(obj interface{}) {
p.crossUpdate.RLock()
defer p.crossUpdate.RUnlock()
p.logger.Debugf("Watcher Pod delete: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "stop")
}
// GenerateHints creates hints needed for hints builder.
func (p *pod) GenerateHints(event bus.Event) bus.Event {
// Try to build a config with enabled builders. Send a provider agnostic payload.
// Builders are Beat specific.
e := bus.Event{}
var kubeMeta, container mapstr.M
annotations := make(mapstr.M, 0)
rawMeta, found := event["kubernetes"]
if found {
kubeMetaMap, ok := rawMeta.(mapstr.M)
if ok {
kubeMeta = kubeMetaMap
// The builder base config can configure any of the field values of kubernetes if need be.
e["kubernetes"] = kubeMeta
if rawAnn, ok := kubeMeta["annotations"]; ok {
anns, _ := rawAnn.(mapstr.M)
if len(anns) != 0 {
annotations = anns.Clone()
}
}
// Look at all the namespace level default annotations and do a merge with priority going to the pod annotations.
if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok {
namespaceAnnotations, _ := rawNsAnn.(mapstr.M)
if len(namespaceAnnotations) != 0 {
annotations.DeepUpdateNoOverwrite(namespaceAnnotations)
}
}
}
}
if host, ok := event["host"]; ok {
e["host"] = host
}
if port, ok := event["port"]; ok {
e["port"] = port
}
if ports, ok := event["ports"]; ok {
e["ports"] = ports
}
if rawCont, found := kubeMeta["container"]; found {
if containerMap, ok := rawCont.(mapstr.M); ok {
container = containerMap
// This would end up adding a runtime entry into the event. This would make sure
// that there is not an attempt to spin up a docker input for a rkt container and when a
// rkt input exists it would be natively supported.
e["container"] = container
}
}
cname := utils.GetContainerName(container)
// Generate hints based on the cumulative of both namespace and pod annotations.
hints, incorrecthints := utils.GenerateHints(annotations, cname, p.config.Prefix, true, AllSupportedHints)
// We check whether the provided annotation follows the supported format and vocabulary. The check happens for annotations that have prefix co.elastic
for _, value := range incorrecthints {
p.logger.Debugf("provided hint: %s/%s is not in the supported list", p.config.Prefix, value)
}
p.logger.Debugf("Generated hints %+v", hints)
if len(hints) != 0 {
e["hints"] = hints
}
p.logger.Debugf("Generated builder event %+v", e)
return e
}
// Start starts the eventer
func (p *pod) Start() error {
if p.nodeWatcher != nil {
err := p.nodeWatcher.Start()
if err != nil {
return err
}
}
if p.namespaceWatcher != nil {
if err := p.namespaceWatcher.Start(); err != nil {
return err
}
}
if p.replicasetWatcher != nil {
err := p.replicasetWatcher.Start()
if err != nil {
return err
}
}
if p.jobWatcher != nil {
err := p.jobWatcher.Start()
if err != nil {
return err
}
}
return p.watcher.Start()
}
// Stop stops the eventer
func (p *pod) Stop() {
p.watcher.Stop()
if p.namespaceWatcher != nil {
p.namespaceWatcher.Stop()
}
if p.nodeWatcher != nil {
p.nodeWatcher.Stop()
}
if p.replicasetWatcher != nil {
p.replicasetWatcher.Stop()
}
if p.jobWatcher != nil {
p.jobWatcher.Stop()
}
}
// emit emits the events for the given pod according to its state and
// the given flag.
// It emits a pod event if the pod has at least a running container,
// and a container event for each one of the ports defined in each
// container.
// If a container doesn't have any defined port, it emits a single
// container event with "port" set to 0.
// "start" events are only generated for containers that have an id.
// "stop" events are always generated to ensure that configurations are
// deleted.
// If the pod is terminated, "stop" events are delayed during the grace
// period defined in `CleanupTimeout`.
// Network information is only included in events for running containers
// and for pods with at least one running container.
func (p *pod) emit(pod *kubernetes.Pod, flag string) {
annotations := kubernetes.PodAnnotations(pod)
labels := kubernetes.PodLabels(pod)
namespaceAnnotations := kubernetes.PodNamespaceAnnotations(pod, p.namespaceWatcher)
eventList := make([][]bus.Event, 0)
portsMap := mapstr.M{}
containers := kubernetes.GetContainersInPod(pod)
anyContainerRunning := false
for _, c := range containers {
if c.Status.State.Running != nil {
anyContainerRunning = true
}
events, ports := p.containerPodEvents(flag, pod, c, annotations, namespaceAnnotations, labels)
if len(events) != 0 {
eventList = append(eventList, events)
}
if len(ports) > 0 {
portsMap.DeepUpdate(ports)
}
}
if len(eventList) != 0 {
event := p.podEvent(flag, pod, portsMap, anyContainerRunning, annotations, namespaceAnnotations, labels)
// Ensure that the pod level event is published first to avoid
// pod metadata overriding a valid container metadata.
eventList = append([][]bus.Event{{event}}, eventList...)
}
delay := (flag == "stop" && kubernetes.PodTerminated(pod, containers))
p.publishAll(eventList, delay)
}
// containerPodEvents creates the events for a container in a pod
// One event is created for each configured port. If there is no
// configured port, a single event is created, with the port set to 0.
// Host and port information is only included if the container is
// running.
// If the container ID is unknown, only "stop" events are generated.
// It also returns a map with the named ports.
func (p *pod) containerPodEvents(flag string, pod *kubernetes.Pod, c *kubernetes.ContainerInPod, annotations, namespaceAnnotations, labels mapstr.M) ([]bus.Event, mapstr.M) {
if c.ID == "" && flag != "stop" {
return nil, nil
}
// This must be an id that doesn't depend on the state of the container
// so it works also on `stop` if containers have been already deleted.
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Spec.Name)
meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.Spec.Name))
cmeta := mapstr.M{
"id": c.ID,
"runtime": c.Runtime,
"image": mapstr.M{
"name": c.Spec.Image,
},
}
// Information that can be used in discovering a workload
kubemetaMap, _ := meta.GetValue("kubernetes")
kubemeta, _ := kubemetaMap.(mapstr.M)
kubemeta = kubemeta.Clone()
kubemeta["annotations"] = annotations
kubemeta["labels"] = labels
kubemeta["container"] = mapstr.M{
"id": c.ID,
"name": c.Spec.Name,
"image": c.Spec.Image,
"runtime": c.Runtime,
}
if len(namespaceAnnotations) != 0 {
kubemeta["namespace_annotations"] = namespaceAnnotations
}
ports := c.Spec.Ports
if len(ports) == 0 {
// Ensure that at least one event is generated for this container.
// Set port to zero to signify that the event is from a container
// and not from a pod.
ports = []kubernetes.ContainerPort{{ContainerPort: 0}}
}
events := []bus.Event{}
portsMap := mapstr.M{}
ShouldPut(meta, "container", cmeta, p.logger)
for _, port := range ports {
event := bus.Event{
"provider": p.uuid,
"id": eventID,
flag: true,
"kubernetes": kubemeta,
// Actual metadata that will enrich the event.
"meta": meta,
}
// Include network information only if the container is running,
// so templates that need network don't generate a config.
if c.Status.State.Running != nil {
if port.Name != "" && port.ContainerPort != 0 {
portsMap[port.Name] = port.ContainerPort
}
event["host"] = pod.Status.PodIP
event["port"] = port.ContainerPort
}
events = append(events, event)
}
return events, portsMap
}
// podEvent creates an event for a pod.
// It only includes network information if `includeNetwork` is true.
func (p *pod) podEvent(flag string, pod *kubernetes.Pod, ports mapstr.M, includeNetwork bool, annotations, namespaceAnnotations, labels mapstr.M) bus.Event {
meta := p.metagen.Generate(pod)
// Information that can be used in discovering a workload
kubemetaMap, _ := meta.GetValue("kubernetes")
kubemeta, _ := kubemetaMap.(mapstr.M)
kubemeta = kubemeta.Clone()
kubemeta["annotations"] = annotations
kubemeta["labels"] = labels
if len(namespaceAnnotations) != 0 {
kubemeta["namespace_annotations"] = namespaceAnnotations
}
// Don't set a port on the event
event := bus.Event{
"provider": p.uuid,
"id": fmt.Sprint(pod.GetObjectMeta().GetUID()),
flag: true,
"kubernetes": kubemeta,
"meta": meta,
}
// Include network information only if the pod has an IP and there is any
// running container that could handle requests.
if pod.Status.PodIP != "" && includeNetwork {
event["host"] = pod.Status.PodIP
if len(ports) > 0 {
event["ports"] = ports
}
}
return event
}
// publishAll publishes all events in the event list in the same order. If delay is true
// publishAll schedules the publication of the events after the configured `CleanupPeriod`
// and returns inmediatelly.
// Order of published events matters, so this function will always publish a given eventList
// in the same goroutine.
func (p *pod) publishAll(eventList [][]bus.Event, delay bool) {
if delay && p.config.CleanupTimeout > 0 {
p.logger.Debug("Publish will wait for the cleanup timeout")
time.AfterFunc(p.config.CleanupTimeout, func() {
p.publishAll(eventList, false)
})
return
}
for _, events := range eventList {
p.publishFunc(events)
}
}