internal/pkg/action/dispatcher.go (129 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. // Package action is used to dispatch actions read from elasticsearch to elastic-agents package action import ( "context" "sync" "time" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" "github.com/rs/zerolog" "golang.org/x/time/rate" ) // Sub is an action subscription that will give a single agent all of it's actions. type Sub struct { agentID string seqNo sqn.SeqNo ch chan []model.Action } // Ch returns the emitter channel for actions. func (s Sub) Ch() chan []model.Action { return s.ch } // Dispatcher tracks agent subscriptions and emits actions to the subscriptions. type Dispatcher struct { am monitor.SimpleMonitor limit *rate.Limiter mx sync.RWMutex subs map[string]Sub } // NewDispatcher creates a Dispatcher using the provided monitor. func NewDispatcher(am monitor.SimpleMonitor, throttle time.Duration, i int) *Dispatcher { r := rate.Inf if throttle > 0 { r = rate.Every(throttle) } return &Dispatcher{ am: am, limit: rate.NewLimiter(r, i), subs: make(map[string]Sub), } } // Run starts the Dispatcher. // After the Dispatcher is started subscriptions may receive actions. // Subscribe may be called before or after Run. func (d *Dispatcher) Run(ctx context.Context) error { for { select { case <-ctx.Done(): return nil case hits := <-d.am.Output(): d.process(ctx, hits) } } } // Subscribe generates a new subscription with the Dispatcher using the provided agentID and seqNo. // There is no check to ensure that the agentID has not been used; using the same one twice results in undefined behaviour. func (d *Dispatcher) Subscribe(log zerolog.Logger, agentID string, seqNo sqn.SeqNo) *Sub { cbCh := make(chan []model.Action, 1) sub := Sub{ agentID: agentID, seqNo: seqNo, ch: cbCh, } d.mx.Lock() d.subs[agentID] = sub sz := len(d.subs) d.mx.Unlock() log.Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher") return &sub } // Unsubscribe removes the given subscription from the dispatcher. // Note that the channel sub.Ch() provides is not closed in this event. func (d *Dispatcher) Unsubscribe(log zerolog.Logger, sub *Sub) { if sub == nil { return } d.mx.Lock() delete(d.subs, sub.agentID) sz := len(d.subs) d.mx.Unlock() log.Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher") } // process gathers actions from the monitor and dispatches them to the corresponding subscriptions. func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) { // Parse hits into map of agent -> actions // Actions are ordered by sequence agentActions := make(map[string][]model.Action) for _, hit := range hits { var action model.Action err := hit.Unmarshal(&action) if err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("Failed to unmarshal action document") break } numAgents := len(action.Agents) for i, agentID := range action.Agents { arr := agentActions[agentID] actionNoAgents := action actionNoAgents.StartTime = offsetStartTime(ctx, action.StartTime, action.RolloutDurationSeconds, i, numAgents) actionNoAgents.Agents = nil arr = append(arr, actionNoAgents) agentActions[agentID] = arr } } for agentID, actions := range agentActions { if err := d.limit.Wait(ctx); err != nil { zerolog.Ctx(ctx).Warn().Err(err).Msg("action dispatcher rate limit error") return } d.dispatch(ctx, agentID, actions) } } // offsetStartTime will return a new start time between start:start+dur based on index i and the total number of agents // As we expect i < total the latest return time will always be < start+dur func offsetStartTime(ctx context.Context, start string, dur int64, i, total int) string { if start == "" { return "" } startTS, err := time.Parse(time.RFC3339, start) if err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("unable to parse start_time string") return "" } d := time.Second * time.Duration(dur) startTS = startTS.Add((d * time.Duration(i)) / time.Duration(total)) // adjust start to a position within the range return startTS.Format(time.RFC3339) } // getSub returns the subscription (if any) for the specified agentID. func (d *Dispatcher) getSub(agentID string) (Sub, bool) { d.mx.RLock() sub, ok := d.subs[agentID] d.mx.RUnlock() return sub, ok } // dispatch passes the actions into the subscription channel as a non-blocking operation. // It may drop actions that will be re-sent to the agent on its next check in. func (d *Dispatcher) dispatch(ctx context.Context, agentID string, acdocs []model.Action) { sub, ok := d.getSub(agentID) if !ok { zerolog.Ctx(ctx).Debug().Str(logger.AgentID, agentID).Msg("Agent is not currently connected. Not dispatching actions.") return } select { case sub.Ch() <- acdocs: default: // This prevents action dispatch blocking when the agent subscription channel is full // in the case when the agent request loop received the actions on long poll but didn't unsubscribe // from the dispatcher. // It is safe to drop them since the agent already has actions and will come around on the next check-in to pick up these new actions. } }