pkg/client/client.go (355 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package client import ( "context" "encoding/json" "errors" "io" "sync" "time" "google.golang.org/grpc" "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent-client/v7/pkg/utils" ) // Action is an action the client exposed to the Elastic Agent. type Action interface { // Name of the action. Name() string // Execute performs the action. Execute(context.Context, map[string]interface{}) (map[string]interface{}, error) } // StateInterface defines how to handle config and stop requests. type StateInterface interface { // OnConfig is called when the Elastic Agent is requesting that the configuration // be set to the provided new value. OnConfig(string) // OnStop is called when the Elastic Agent is requesting the application to stop. OnStop() // OnError is called when an errors occurs communicating with Elastic Agent. // // These error messages are not given by the Elastic Agent, they are just errors exposed // from the client-side GRPC connection. OnError(error) } // Client manages the state and communication to the Elastic Agent. type Client interface { // Start starts the client. Start(ctx context.Context) error // Stop stops the client. Stop() // Status updates the status and sents it to the Elastic Agent. Status(status proto.StateObserved_Status, message string, payload map[string]interface{}) error // RegisterAction registers action handler with the client RegisterAction(action Action) // UnregisterAction unregisters action handler with the client UnregisterAction(action Action) } // client manages the state and communication to the Elastic Agent. type client struct { target string opts []grpc.DialOption token string impl StateInterface actions map[string]Action amx sync.RWMutex cfgIdx uint64 cfg string expected proto.StateExpected_State observed proto.StateObserved_Status observedMessage string observedPayload string ctx context.Context cancel context.CancelFunc wg sync.WaitGroup client proto.ElasticAgentClient cfgMu sync.RWMutex obsMu sync.RWMutex // overridden in tests to make fast minCheckTimeout time.Duration } // New creates a client connection to Elastic Agent. func New(target string, token string, impl StateInterface, actions []Action, opts ...grpc.DialOption) Client { actionMap := map[string]Action{} for _, act := range actions { actionMap[act.Name()] = act } return &client{ target: target, opts: opts, token: token, impl: impl, actions: actionMap, cfgIdx: InitialConfigIdx, expected: proto.StateExpected_RUNNING, observed: proto.StateObserved_STARTING, observedMessage: "Starting", minCheckTimeout: CheckinMinimumTimeout, } } func (c *client) RegisterAction(action Action) { c.amx.Lock() c.actions[action.Name()] = action c.amx.Unlock() } func (c *client) UnregisterAction(action Action) { c.amx.Lock() delete(c.actions, action.Name()) c.amx.Unlock() } // Start starts the connection to Elastic Agent. func (c *client) Start(ctx context.Context) error { c.ctx, c.cancel = context.WithCancel(ctx) conn, err := grpc.DialContext(ctx, c.target, c.opts...) if err != nil { return err } c.client = proto.NewElasticAgentClient(conn) c.startCheckin() c.startActions() return nil } // Stop stops the connection to Elastic Agent. func (c *client) Stop() { if c.cancel != nil { c.cancel() c.wg.Wait() c.ctx = nil c.cancel = nil } } // Status updates the current status of the client in the Elastic Agent. func (c *client) Status(status proto.StateObserved_Status, message string, payload map[string]interface{}) error { payloadStr := "" if payload != nil { payloadBytes, err := json.Marshal(payload) if err != nil { return err } payloadStr = string(payloadBytes) } c.obsMu.Lock() c.observed = status c.observedMessage = message c.observedPayload = payloadStr c.obsMu.Unlock() return nil } // startCheckin starts the go routines to send and receive check-ins // // This starts 3 go routines to manage the check-in bi-directional stream. The first // go routine starts the stream then starts one go routine to receive messages and // another go routine to send messages. The first go routine then blocks waiting on // the receive and send to finish, then restarts the stream or exits if the context // has been cancelled. func (c *client) startCheckin() { c.wg.Add(1) go func() { defer c.wg.Done() for { select { case <-c.ctx.Done(): // stopped return default: } c.checkinRoundTrip() } }() } func (c *client) checkinRoundTrip() { checkinCtx, checkinCancel := context.WithCancel(c.ctx) defer checkinCancel() checkinClient, err := c.client.Checkin(checkinCtx) if err != nil { c.impl.OnError(err) return } var checkinWG sync.WaitGroup done := make(chan bool) // expected state check-ins checkinWG.Add(1) go func() { defer checkinWG.Done() for { expected, err := checkinClient.Recv() if err != nil { if !errors.Is(err, io.EOF) { c.impl.OnError(err) } close(done) return } if c.expected == proto.StateExpected_STOPPING { // in stopping state, do nothing with any other expected states continue } if expected.State == proto.StateExpected_STOPPING { // Elastic Agent is requesting us to stop. c.expected = expected.State c.impl.OnStop() continue } if expected.ConfigStateIdx != c.cfgIdx { // Elastic Agent is requesting us to update config. c.cfgMu.Lock() c.cfgIdx = expected.ConfigStateIdx c.cfg = expected.Config c.cfgMu.Unlock() c.impl.OnConfig(expected.Config) continue } } }() // observed state check-ins checkinWG.Add(1) go func() { defer checkinWG.Done() var lastSent time.Time var lastSentCfgIdx uint64 var lastSentStatus proto.StateObserved_Status var lastSentMessage string var lastSentPayload string for { t := time.NewTimer(500 * time.Millisecond) select { case <-done: t.Stop() return case <-t.C: } c.cfgMu.RLock() cfgIdx := c.cfgIdx c.cfgMu.RUnlock() c.obsMu.RLock() observed := c.observed observedMsg := c.observedMessage observedPayload := c.observedPayload c.obsMu.RUnlock() sendMessage := func() error { err := checkinClient.Send(&proto.StateObserved{ Token: c.token, ConfigStateIdx: cfgIdx, Status: observed, Message: observedMsg, Payload: observedPayload, }) if err != nil { c.impl.OnError(err) return err } lastSent = time.Now() lastSentCfgIdx = cfgIdx lastSentStatus = observed lastSentMessage = observedMsg lastSentPayload = observedPayload return nil } // On start keep trying to send the initial check-in. if lastSent.IsZero() { if sendMessage() != nil { return } continue } // Send new status when it has changed. if lastSentCfgIdx != cfgIdx || lastSentStatus != observed || lastSentMessage != observedMsg || lastSentPayload != observedPayload { if sendMessage() != nil { return } continue } // Send when more than 30 seconds has passed without any status change. if time.Since(lastSent) >= c.minCheckTimeout { if sendMessage() != nil { return } continue } } }() // wait for both send and recv go routines to stop before // starting a new stream. checkinWG.Wait() checkinClient.CloseSend() } // startActions starts the go routines to send and receive actions // // This starts 3 go routines to manage the actions bi-directional stream. The first // go routine starts the stream then starts one go routine to receive messages and // another go routine to send messages. The first go routine then blocks waiting on // the receive and send to finish, then restarts the stream or exits if the context // has been cancelled. func (c *client) startActions() { c.wg.Add(1) // results are held outside of the retry loop, because on re-connect // we still want to send the responses that either failed or haven't been // sent back to the agent. actionResults := make(chan *proto.ActionResponse, 100) go func() { defer c.wg.Done() for { select { case <-c.ctx.Done(): // stopped return default: } c.actionRoundTrip(actionResults) } }() } func (c *client) actionRoundTrip(actionResults chan *proto.ActionResponse) { actionsCtx, actionsCancel := context.WithCancel(c.ctx) defer actionsCancel() actionsClient, err := c.client.Actions(actionsCtx) if err != nil { c.impl.OnError(err) return } var actionsWG sync.WaitGroup done := make(chan bool) // action requests actionsWG.Add(1) go func() { defer actionsWG.Done() for { action, err := actionsClient.Recv() if err != nil { if !errors.Is(err, io.EOF) { c.impl.OnError(err) } close(done) return } c.amx.RLock() actionImpl, ok := c.actions[action.Name] c.amx.RUnlock() if !ok { actionResults <- &proto.ActionResponse{ Token: c.token, Id: action.Id, Status: proto.ActionResponse_FAILED, Result: ActionErrUndefined, } continue } var params map[string]interface{} err = json.Unmarshal(action.Params, &params) if err != nil { actionResults <- &proto.ActionResponse{ Token: c.token, Id: action.Id, Status: proto.ActionResponse_FAILED, Result: ActionErrUnmarshableParams, } continue } // perform the action go func() { res, err := actionImpl.Execute(c.ctx, params) if err != nil { actionResults <- &proto.ActionResponse{ Token: c.token, Id: action.Id, Status: proto.ActionResponse_FAILED, Result: utils.JSONMustMarshal(map[string]string{ "error": err.Error(), }), } return } resBytes, err := json.Marshal(res) if err != nil { // client-side error, should have been marshal-able c.impl.OnError(err) actionResults <- &proto.ActionResponse{ Token: c.token, Id: action.Id, Status: proto.ActionResponse_FAILED, Result: ActionErrUnmarshableResult, } return } actionResults <- &proto.ActionResponse{ Token: c.token, Id: action.Id, Status: proto.ActionResponse_SUCCESS, Result: resBytes, } }() } }() // action responses actionsWG.Add(1) go func() { defer actionsWG.Done() // initial connection of stream must send the token so // the Elastic Agent knows this clients token. err := actionsClient.Send(&proto.ActionResponse{ Token: c.token, Id: ActionResponseInitID, Status: proto.ActionResponse_SUCCESS, Result: []byte("{}"), }) if err != nil { c.impl.OnError(err) return } for { select { case <-done: return case res := <-actionResults: err := actionsClient.Send(res) if err != nil { // failed to send, add back to response to try again actionResults <- res c.impl.OnError(err) return } } } }() // wait for both send and recv go routines to stop before // starting a new stream. actionsWG.Wait() actionsClient.CloseSend() }