internal/pkg/agent/application/gateway/fleet/fleet_gateway.go (351 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package fleet
import (
"context"
"time"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/otel/otelhelpers"
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
// Max number of times an invalid API Key is checked
const maxUnauthCounter int = 6
// Consts for states at fleet checkin
const (
fleetStateDegraded = "DEGRADED"
fleetStateOnline = "online"
fleetStateError = "error"
fleetStateStarting = "starting"
)
// Default backoff settings for connecting to Fleet
var defaultFleetBackoffSettings = backoffSettings{
Init: 60 * time.Second,
Max: 10 * time.Minute,
}
// Default Configuration for the Fleet Gateway.
var defaultGatewaySettings = &fleetGatewaySettings{
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
ErrConsecutiveUnauthDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit
Backoff: &defaultFleetBackoffSettings,
}
type fleetGatewaySettings struct {
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff *backoffSettings `config:"backoff"`
ErrConsecutiveUnauthDuration time.Duration
}
type backoffSettings struct {
Init time.Duration `config:"init"`
Max time.Duration `config:"max"`
}
type agentInfo interface {
AgentID() string
}
type stateStore interface {
AckToken() string
SetAckToken(ackToken string)
Save() error
}
type FleetGateway struct {
log *logger.Logger
client client.Sender
scheduler scheduler.Scheduler
settings *fleetGatewaySettings
agentInfo agentInfo
acker acker.Acker
unauthCounter int
checkinFailCounter int
stateFetcher func() coordinator.State
stateStore stateStore
errCh chan error
actionCh chan []fleetapi.Action
}
// New creates a new fleet gateway
func New(
log *logger.Logger,
agentInfo agentInfo,
client client.Sender,
acker acker.Acker,
stateFetcher func() coordinator.State,
stateStore stateStore,
) (*FleetGateway, error) {
scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
return newFleetGatewayWithScheduler(
log,
defaultGatewaySettings,
agentInfo,
client,
scheduler,
acker,
stateFetcher,
stateStore,
)
}
func newFleetGatewayWithScheduler(
log *logger.Logger,
settings *fleetGatewaySettings,
agentInfo agentInfo,
client client.Sender,
scheduler scheduler.Scheduler,
acker acker.Acker,
stateFetcher func() coordinator.State,
stateStore stateStore,
) (*FleetGateway, error) {
return &FleetGateway{
log: log,
client: client,
settings: settings,
agentInfo: agentInfo,
scheduler: scheduler,
acker: acker,
stateFetcher: stateFetcher,
stateStore: stateStore,
errCh: make(chan error),
actionCh: make(chan []fleetapi.Action, 1),
}, nil
}
func (f *FleetGateway) Actions() <-chan []fleetapi.Action {
return f.actionCh
}
func (f *FleetGateway) Run(ctx context.Context) error {
var requestBackoff backoff.Backoff
if f.settings.Backoff == nil {
requestBackoff = RequestBackoff(ctx.Done())
} else {
// this is only used in tests
requestBackoff = backoff.NewEqualJitterBackoff(
ctx.Done(),
f.settings.Backoff.Init,
f.settings.Backoff.Max,
)
}
f.log.Info("Fleet gateway started")
for {
select {
case <-ctx.Done():
f.scheduler.Stop()
f.log.Info("Fleet gateway stopped")
return ctx.Err()
case <-f.scheduler.WaitTick():
f.log.Debug("FleetGateway calling Checkin API")
// Execute the checkin call and for any errors returned by the fleet-server API
// the function will retry to communicate with fleet-server with an exponential delay and some
// jitter to help better distribute the load from a fleet of agents.
resp, err := f.doExecute(ctx, requestBackoff)
if err != nil {
continue
}
actions := make([]fleetapi.Action, len(resp.Actions))
copy(actions, resp.Actions)
if len(actions) > 0 {
f.actionCh <- actions
}
}
}
}
// Errors returns the channel to watch for reported errors.
func (f *FleetGateway) Errors() <-chan error {
return f.errCh
}
func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) {
bo.Reset()
// Guard if the context is stopped by a out of bound call,
// this mean we are rebooting to change the log level or the system is shutting us down.
for ctx.Err() == nil {
f.log.Debugf("Checking started")
resp, took, err := f.execute(ctx)
if err != nil {
f.checkinFailCounter++
// Report the first two failures at warn level as they may be recoverable with retries.
if f.checkinFailCounter <= 2 {
f.log.Warnw("Possible transient error during checkin with fleet-server, retrying",
"error.message", err, "request_duration_ns", took, "failed_checkins", f.checkinFailCounter,
"retry_after_ns", bo.NextWait())
} else {
f.log.Errorw("Cannot checkin in with fleet-server, retrying",
"error.message", err, "request_duration_ns", took, "failed_checkins", f.checkinFailCounter,
"retry_after_ns", bo.NextWait())
}
if !bo.Wait() {
if ctx.Err() != nil {
// if the context is cancelled, break out of the loop
break
}
// This should not really happen, but just in-case this error is used to show that
// something strange occurred and we want to log it and report it.
err := errors.New(
"checkin retry loop was stopped",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, f.client.URI()),
)
f.log.Error(err)
f.errCh <- err
return nil, err
}
f.errCh <- err
continue
}
if f.checkinFailCounter > 0 {
// Log at same level as error logs above so subsequent successes are visible when log level is set to 'error'.
f.log.Warnf("Checkin request to fleet-server succeeded after %d failures", f.checkinFailCounter)
}
f.checkinFailCounter = 0
if resp.FleetWarning != "" {
f.errCh <- coordinator.NewWarningError(resp.FleetWarning)
} else {
f.errCh <- nil
}
// Request was successful, return the collected actions.
return resp, nil
}
// This mean that the next loop was cancelled because of the context, we should return the error
// but we should not log it, because we are in the process of shutting down.
return nil, ctx.Err()
}
func (f *FleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState, collector *status.AggregateStatus) []fleetapi.CheckinComponent {
if components == nil {
return nil
}
stateString := func(s eaclient.UnitState) string {
if state := s.String(); state != "UNKNOWN" {
return state
}
return ""
}
unitTypeString := func(t eaclient.UnitType) string {
if typ := t.String(); typ != "unknown" {
return typ
}
return ""
}
size := len(components)
if collector != nil {
size += len(collector.ComponentStatusMap)
}
checkinComponents := make([]fleetapi.CheckinComponent, 0, size)
for _, item := range components {
component := item.Component
state := item.State
checkinComponent := fleetapi.CheckinComponent{
ID: component.ID,
Type: component.Type(),
Status: stateString(state.State),
Message: state.Message,
}
if state.Units != nil {
units := make([]fleetapi.CheckinUnit, 0, len(state.Units))
for unitKey, unitState := range state.Units {
units = append(units, fleetapi.CheckinUnit{
ID: unitKey.UnitID,
Type: unitTypeString(unitKey.UnitType),
Status: stateString(unitState.State),
Message: unitState.Message,
Payload: unitState.Payload,
})
}
checkinComponent.Units = units
}
checkinComponents = append(checkinComponents, checkinComponent)
}
// OTel status is placed as a component for each top-level component in OTel
// and each subcomponent is a unit.
if collector != nil {
for id, item := range collector.ComponentStatusMap {
state, msg := otelhelpers.StateWithMessage(item)
checkinComponent := fleetapi.CheckinComponent{
ID: id,
Type: "otel",
Status: stateString(state),
Message: msg,
}
if len(item.ComponentStatusMap) > 0 {
units := make([]fleetapi.CheckinUnit, 0, len(item.ComponentStatusMap))
for unitId, unitItem := range item.ComponentStatusMap {
unitState, unitMsg := otelhelpers.StateWithMessage(unitItem)
units = append(units, fleetapi.CheckinUnit{
ID: unitId,
Status: stateString(unitState),
Message: unitMsg,
})
}
checkinComponent.Units = units
}
checkinComponents = append(checkinComponents, checkinComponent)
}
}
return checkinComponents
}
func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
ecsMeta, err := info.Metadata(ctx, f.log)
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
return nil, 0, err
}
// retrieve ack token from the store
ackToken := f.stateStore.AckToken()
if ackToken != "" {
f.log.Debugf("using previously saved ack token: %v", ackToken)
}
// get current state
state := f.stateFetcher()
// convert components into checkin components structure
components := f.convertToCheckinComponents(state.Components, state.Collector)
f.log.Debugf("correcting agent loglevel from %s to %s using coordinator state", ecsMeta.Elastic.Agent.LogLevel, state.LogLevel.String())
// Fix loglevel with the current log level used by coordinator
ecsMeta.Elastic.Agent.LogLevel = state.LogLevel.String()
// checkin
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)
req := &fleetapi.CheckinRequest{
AckToken: ackToken,
Metadata: ecsMeta,
Status: agentStateToString(state.State),
Message: state.Message,
Components: components,
UpgradeDetails: state.UpgradeDetails,
}
resp, took, err := cmd.Execute(ctx, req)
if isUnauth(err) {
f.unauthCounter++
if f.shouldUseLongSched() {
f.log.Warnf("retrieved an invalid api key error '%d' times. will use long scheduler", f.unauthCounter)
f.scheduler.SetDuration(defaultGatewaySettings.ErrConsecutiveUnauthDuration)
return &fleetapi.CheckinResponse{}, took, nil
}
return nil, took, err
}
f.scheduler.SetDuration(defaultGatewaySettings.Duration)
f.unauthCounter = 0
if err != nil {
return nil, took, err
}
// Save the latest ackToken
if resp.AckToken != "" {
f.stateStore.SetAckToken(resp.AckToken)
serr := f.stateStore.Save()
if serr != nil {
f.log.Errorf("failed to save the ack token, err: %v", serr)
}
}
return resp, took, nil
}
// shouldUseLongSched checks if the max number of trying an invalid key is reached
func (f *FleetGateway) shouldUseLongSched() bool {
return f.unauthCounter > maxUnauthCounter
}
func isUnauth(err error) bool {
return errors.Is(err, client.ErrInvalidAPIKey)
}
func (f *FleetGateway) SetClient(c client.Sender) {
f.client = c
}
func agentStateToString(state agentclient.State) string {
switch state {
case agentclient.Healthy:
return fleetStateOnline
case agentclient.Failed:
return fleetStateError
case agentclient.Starting:
return fleetStateStarting
case agentclient.Configuring:
return fleetStateOnline
case agentclient.Upgrading:
return fleetStateOnline
case agentclient.Rollback:
return fleetStateDegraded
case agentclient.Degraded:
return fleetStateDegraded
// Report Stopping and Stopped as online since Fleet doesn't understand these states yet.
// Usually Stopping and Stopped mean the agent is going to stop checking in at which point Fleet
// will update the state to offline. Use the online state here since there isn't anything better
// at the moment, and the agent will end up in the expected offline state eventually.
case agentclient.Stopping:
return fleetStateOnline
case agentclient.Stopped:
return fleetStateOnline
}
// Unknown states map to degraded.
return fleetStateDegraded
}
func RequestBackoff(done <-chan struct{}) backoff.Backoff {
return backoff.NewEqualJitterBackoff(
done,
defaultFleetBackoffSettings.Init,
defaultFleetBackoffSettings.Max,
)
}