libbeat/autodiscover/providers/kubernetes/kubernetes.go (293 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build linux || darwin || windows
package kubernetes
import (
"context"
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"github.com/gofrs/uuid/v5"
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/autodiscover/template"
"github.com/elastic/elastic-agent-autodiscover/bus"
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/k8skeystore"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/keystore"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
func init() {
err := autodiscover.Registry.AddProvider("kubernetes", AutodiscoverBuilder)
if err != nil {
logp.Error(fmt.Errorf("could not add `hints` builder"))
}
}
// Eventer allows defining ways in which kubernetes resource events are observed and processed
type Eventer interface {
kubernetes.ResourceEventHandler
GenerateHints(event bus.Event) bus.Event
Start() error
Stop()
}
// EventManager allows defining ways in which kubernetes resource events are observed and processed
type EventManager interface {
GenerateHints(event bus.Event) bus.Event
Start()
Stop()
}
// Provider implements autodiscover provider for docker containers
type Provider struct {
config *Config
bus bus.Bus
templates template.Mapper
builders autodiscover.Builders
appenders autodiscover.Appenders
logger *logp.Logger
eventManager EventManager
}
// eventerManager implements start/stop methods for autodiscover provider with resource eventer
type eventerManager struct {
eventer Eventer
logger *logp.Logger
}
// leaderElectionManager implements start/stop methods for autodiscover provider with leaderElection
type leaderElectionManager struct {
leaderElection leaderelection.LeaderElectionConfig
cancelLeaderElection context.CancelFunc
logger *logp.Logger
}
// AutodiscoverBuilder builds and returns an autodiscover provider
func AutodiscoverBuilder(
beatName string,
bus bus.Bus,
uuid uuid.UUID,
c *config.C,
keystore keystore.Keystore,
) (autodiscover.Provider, error) {
logger := logp.NewLogger("autodiscover")
errWrap := func(err error) error {
return fmt.Errorf("error setting up kubernetes autodiscover provider: %w", err)
}
config := defaultConfig()
config.LeaderLease = fmt.Sprintf("%v-cluster-leader", beatName)
err := c.Unpack(&config)
if err != nil {
return nil, errWrap(err)
}
client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
return nil, errWrap(err)
}
k8sKeystoreProvider := k8skeystore.NewKubernetesKeystoresRegistry(logger, client)
mapper, err := template.NewConfigMapper(config.Templates, keystore, k8sKeystoreProvider)
if err != nil {
return nil, errWrap(err)
}
builders, err := autodiscover.NewBuilders(config.Builders, config.Hints, k8sKeystoreProvider)
if err != nil {
return nil, errWrap(err)
}
appenders, err := autodiscover.NewAppenders(config.Appenders)
if err != nil {
return nil, errWrap(err)
}
p := &Provider{
config: config,
bus: bus,
templates: mapper,
builders: builders,
appenders: appenders,
logger: logger,
}
if p.config.Unique {
p.eventManager, err = NewLeaderElectionManager(uuid, config, client, p.startLeading, p.stopLeading, logger)
} else {
p.eventManager, err = NewEventerManager(uuid, c, config, client, p.publish)
}
if err != nil {
return nil, errWrap(err)
}
return p, nil
}
// Start for Runner interface.
func (p *Provider) Start() {
p.eventManager.Start()
}
// Stop signals the stop channel to force the watch loop routine to stop.
func (p *Provider) Stop() {
p.eventManager.Stop()
}
// String returns a description of kubernetes autodiscover provider.
func (p *Provider) String() string {
return "kubernetes"
}
func (p *Provider) publish(events []bus.Event) {
if len(events) == 0 {
return
}
configs := make([]*config.C, 0)
id := events[0]["id"]
for _, event := range events {
// Ensure that all events have the same ID. If not panic
if event["id"] != id {
panic("events from Kubernetes can't have different id fields")
}
// Try to match a config
if config := p.templates.GetConfig(event); config != nil {
configs = append(configs, config...)
} else {
// If there isn't a default template then attempt to use builders
e := p.eventManager.GenerateHints(event)
if config := p.builders.GetConfig(e); config != nil {
configs = append(configs, config...)
}
}
}
// Since all the events belong to the same event ID pick on and add in all the configs
event := bus.Event(mapstr.M(events[0]).Clone())
// Remove the port to avoid ambiguity during debugging
delete(event, "port")
event["config"] = configs
// Call all appenders to append any extra configuration
p.appenders.Append(event)
p.bus.Publish(event)
}
func (p *Provider) startLeading(uuid string, eventID string) {
event := bus.Event{
"start": true,
"provider": uuid,
"id": eventID,
"unique": "true",
}
if config := p.templates.GetConfig(event); config != nil {
event["config"] = config
}
p.bus.Publish(event)
}
func (p *Provider) stopLeading(uuid string, eventID string) {
event := bus.Event{
"stop": true,
"provider": uuid,
"id": eventID,
"unique": "true",
}
if config := p.templates.GetConfig(event); config != nil {
event["config"] = config
}
p.bus.Publish(event)
}
func NewEventerManager(
uuid uuid.UUID,
c *config.C,
cfg *Config,
client k8s.Interface,
publish func(event []bus.Event),
) (EventManager, error) {
var err error
em := &eventerManager{}
switch cfg.Resource {
case "pod":
em.eventer, err = NewPodEventer(uuid, c, client, publish)
case "node":
em.eventer, err = NewNodeEventer(uuid, c, client, publish)
case "service":
em.eventer, err = NewServiceEventer(uuid, c, client, publish)
default:
return nil, fmt.Errorf("unsupported autodiscover resource %s", cfg.Resource)
}
if err != nil {
return nil, err
}
return em, nil
}
func NewLeaderElectionManager(
uuid uuid.UUID,
cfg *Config,
client k8s.Interface,
startLeading func(uuid string, eventID string),
stopLeading func(uuid string, eventID string),
logger *logp.Logger,
) (EventManager, error) {
lem := &leaderElectionManager{logger: logger}
var id string
if cfg.Node != "" {
id = "beats-leader-" + cfg.Node
} else {
id = "beats-leader-" + uuid.String()
}
ns, err := kubernetes.InClusterNamespace()
if err != nil {
ns = "default"
}
lease := metav1.ObjectMeta{
Name: cfg.LeaderLease,
Namespace: ns,
}
var eventID string
leaseId := lease.Name + "-" + lease.Namespace
lem.leaderElection = leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: lease,
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
},
ReleaseOnCancel: true,
LeaseDuration: cfg.LeaseDuration,
RenewDeadline: cfg.RenewDeadline,
RetryPeriod: cfg.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
eventID = fmt.Sprintf("%v-%v", leaseId, time.Now().UnixNano())
logger.Debugf("leader election lock GAINED, holder: %v, eventID: %v", id, eventID)
startLeading(uuid.String(), eventID)
},
OnStoppedLeading: func() {
logger.Debugf("leader election lock LOST, holder: %v, eventID: %v", id, eventID)
stopLeading(uuid.String(), eventID)
},
},
}
return lem, nil
}
// Start for EventManager interface.
func (p *eventerManager) Start() {
if err := p.eventer.Start(); err != nil {
p.logger.Errorf("Error starting kubernetes autodiscover provider: %s", err)
}
}
// Stop signals the stop channel to force the watch loop routine to stop.
func (p *eventerManager) Stop() {
p.eventer.Stop()
}
// GenerateHints for EventManager interface.
func (p *eventerManager) GenerateHints(event bus.Event) bus.Event {
return p.eventer.GenerateHints(event)
}
// Start for EventManager interface.
func (p *leaderElectionManager) Start() {
ctx, cancel := context.WithCancel(context.TODO())
p.cancelLeaderElection = cancel
p.startLeaderElectorIndefinitely(ctx, p.leaderElection)
}
// Stop signals the stop channel to force the leader election loop routine to stop.
func (p *leaderElectionManager) Stop() {
if p.cancelLeaderElection != nil {
p.cancelLeaderElection()
}
}
// GenerateHints for EventManager interface.
func (p *leaderElectionManager) GenerateHints(event bus.Event) bus.Event {
return event
}
// startLeaderElectorIndefinitely starts a Leader Elector in the background with the provided config.
// If this instance gets the lease lock and later loses it, we run the leader elector again.
func (p *leaderElectionManager) startLeaderElectorIndefinitely(ctx context.Context, lec leaderelection.LeaderElectionConfig) {
le, err := leaderelection.NewLeaderElector(lec)
if err != nil {
p.logger.Errorf("error while creating Leader Elector: %w", err)
}
p.logger.Debugf("Starting Leader Elector")
go func() {
for {
le.Run(ctx)
select {
case <-ctx.Done():
return
default:
// Run returned because the lease was lost. Run the leader elector again, so this instance
// is still a candidate to get the lease.
}
}
}()
}
func ShouldPut(event mapstr.M, field string, value interface{}, logger *logp.Logger) {
_, err := event.Put(field, value)
if err != nil {
logger.Debugf("Failed to put field '%s' with value '%s': %s", field, value, err)
}
}
func ShouldDelete(event mapstr.M, field string, logger *logp.Logger) {
err := event.Delete(field)
if err != nil {
logger.Debugf("Failed to delete field '%s': %s", field, err)
}
}