pkg/control/v2/client/client.go (454 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"sync"
"time"
"google.golang.org/grpc"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/pkg/control"
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
)
// UnitType is the type of the unit
type UnitType = cproto.UnitType
// State is the state codes
type State = cproto.State
// CollectorComponentStatus is the status of a collector component
type CollectorComponentStatus = cproto.CollectorComponentStatus
// AdditionalMetrics is the type for additional diagnostic requests
type AdditionalMetrics = cproto.AdditionalDiagnosticRequest
const (
// UnitTypeInput is an input unit.
UnitTypeInput UnitType = cproto.UnitType_INPUT
// UnitTypeOutput is an output unit.
UnitTypeOutput UnitType = cproto.UnitType_OUTPUT
)
const (
// Starting is when the it is still starting.
Starting State = cproto.State_STARTING
// Configuring is when it is configuring.
Configuring State = cproto.State_CONFIGURING
// Healthy is when it is healthy.
Healthy State = cproto.State_HEALTHY
// Degraded is when it is degraded.
Degraded State = cproto.State_DEGRADED
// Failed is when it is failed.
Failed State = cproto.State_FAILED
// Stopping is when it is stopping.
Stopping State = cproto.State_STOPPING
// Stopped is when it is stopped.
Stopped State = cproto.State_STOPPED
// Upgrading is when it is upgrading.
Upgrading State = cproto.State_UPGRADING
// Rollback is when it is upgrading is rolling back.
Rollback State = cproto.State_ROLLBACK
)
const (
// CollectorComponentStatusNone is when the collector component doesn't report a status.
CollectorComponentStatusNone CollectorComponentStatus = cproto.CollectorComponentStatus_StatusNone
// CollectorComponentStatusStarting is when the collector component is starting.
CollectorComponentStatusStarting CollectorComponentStatus = cproto.CollectorComponentStatus_StatusStarting
// CollectorComponentStatusOK is when the collector component is in good health.
CollectorComponentStatusOK CollectorComponentStatus = cproto.CollectorComponentStatus_StatusOK
// CollectorComponentStatusRecoverableError is when the collector component had an error but can recover.
CollectorComponentStatusRecoverableError CollectorComponentStatus = cproto.CollectorComponentStatus_StatusRecoverableError
// CollectorComponentStatusPermanentError is when the collector component had a permanent error.
CollectorComponentStatusPermanentError CollectorComponentStatus = cproto.CollectorComponentStatus_StatusPermanentError
// CollectorComponentStatusFatalError is when the collector component had a fatal error.
CollectorComponentStatusFatalError CollectorComponentStatus = cproto.CollectorComponentStatus_StatusFatalError
// CollectorComponentStatusStopping is when the collector component is stopping.
CollectorComponentStatusStopping CollectorComponentStatus = cproto.CollectorComponentStatus_StatusStopping
// CollectorComponentStatusStopped is when the collector component is stopped.
CollectorComponentStatusStopped CollectorComponentStatus = cproto.CollectorComponentStatus_StatusStopped
)
const (
// CPU requests additional CPU diagnostics
CPU AdditionalMetrics = cproto.AdditionalDiagnosticRequest_CPU
)
// Version is the current running version of the daemon.
type Version struct {
Version string `json:"version" yaml:"version"`
Commit string `json:"commit" yaml:"commit"`
BuildTime time.Time `json:"build_time" yaml:"build_time"`
Snapshot bool `json:"snapshot" yaml:"snapshot"`
Fips bool `json:"fips" yaml:"fips"`
}
// ComponentVersionInfo is the version information for the component.
type ComponentVersionInfo struct {
// Name of the component.
Name string `json:"name" yaml:"name"`
// Extra meta information about the version.
Meta map[string]string `json:"meta,omitempty" yaml:"meta,omitempty"`
}
// ComponentUnitState is a state of a unit running inside a component.
type ComponentUnitState struct {
UnitID string `json:"unit_id" yaml:"unit_id"`
UnitType UnitType `json:"unit_type" yaml:"unit_type"`
State State `json:"state" yaml:"state"`
Message string `json:"message" yaml:"message"`
Payload map[string]interface{} `json:"payload,omitempty" yaml:"payload,omitempty"`
}
// ComponentState is a state of a component managed by the Elastic Agent.
type ComponentState struct {
ID string `json:"id" yaml:"id"`
Name string `json:"name" yaml:"name"`
State State `json:"state" yaml:"state"`
Message string `json:"message" yaml:"message"`
Units []ComponentUnitState `json:"units" yaml:"units"`
VersionInfo ComponentVersionInfo `json:"version_info" yaml:"version_info"`
}
// CollectorComponent is a state of a collector component managed by the Elastic Agent.
type CollectorComponent struct {
Status CollectorComponentStatus `json:"status" yaml:"status"`
Error string `json:"error,omitempty" yaml:"error,omitempty"`
Timestamp time.Time `json:"timestamp,omitempty" yaml:"timestamp,omitempty"`
ComponentStatusMap map[string]*CollectorComponent `json:"components,omitempty" yaml:"components,omitempty"`
}
// AgentStateInfo is the overall information about the Elastic Agent.
type AgentStateInfo struct {
ID string `json:"id" yaml:"id"`
Version string `json:"version" yaml:"version"`
Commit string `json:"commit" yaml:"commit"`
BuildTime string `json:"build_time" yaml:"build_time"`
Snapshot bool `json:"snapshot" yaml:"snapshot"`
PID int32 `json:"pid" yaml:"pid"`
Unprivileged bool `json:"unprivileged" yaml:"unprivileged"`
IsManaged bool `json:"is_managed" yaml:"is_managed"`
}
// AgentState is the current state of the Elastic Agent.
type AgentState struct {
Info AgentStateInfo `json:"info" yaml:"info"`
State State `json:"state" yaml:"state"`
Message string `json:"message" yaml:"message"`
Components []ComponentState `json:"components" yaml:"components"`
FleetState State `yaml:"fleet_state"`
FleetMessage string `yaml:"fleet_message"`
UpgradeDetails *cproto.UpgradeDetails `json:"upgrade_details,omitempty" yaml:"upgrade_details,omitempty"`
Collector *CollectorComponent `json:"collector,omitempty" yaml:"collector,omitempty"`
}
// DiagnosticFileResult is a diagnostic file result.
type DiagnosticFileResult struct {
Name string
Filename string
Description string
ContentType string
Content []byte
Generated time.Time
}
// DiagnosticUnitRequest allows a specific unit to be targeted for diagnostics.
type DiagnosticUnitRequest struct {
ComponentID string
UnitID string
UnitType UnitType
}
// DiagnosticComponentRequest targets a specific component for diagnostics
type DiagnosticComponentRequest struct {
ComponentID string
}
// DiagnosticUnitResult is a set of results for a unit.
type DiagnosticUnitResult struct {
ComponentID string
UnitID string
UnitType UnitType
Err error
Results []DiagnosticFileResult
}
// DiagnosticComponentResult is a set of diagnostic results for a component
type DiagnosticComponentResult struct {
ComponentID string
Err error
Results []DiagnosticFileResult
}
// Client communicates to Elastic Agent through the control protocol.
type Client interface {
// Connect connects to the running Elastic Agent.
Connect(ctx context.Context, opts ...grpc.DialOption) error
// Disconnect disconnects from the running Elastic Agent.
Disconnect()
// Version returns the current version of the running agent.
Version(ctx context.Context) (Version, error)
// State returns the current state of the running agent.
State(ctx context.Context) (*AgentState, error)
// StateWatch watches the current state of the running agent.
StateWatch(ctx context.Context) (ClientStateWatch, error)
// Restart triggers restarting the current running daemon.
Restart(ctx context.Context) error
// Upgrade triggers upgrade of the current running daemon.
Upgrade(ctx context.Context, version string, sourceURI string, skipVerify bool, skipDefaultPgp bool, pgpBytes ...string) (string, error)
// DiagnosticAgent gathers diagnostics information for the running Elastic Agent.
DiagnosticAgent(ctx context.Context, additionalDiags []AdditionalMetrics) ([]DiagnosticFileResult, error)
// DiagnosticUnits gathers diagnostics information from specific units (or all if non are provided).
DiagnosticUnits(ctx context.Context, units ...DiagnosticUnitRequest) ([]DiagnosticUnitResult, error)
// DiagnosticComponents gathers diagnostic information for specific components
// the additionalDiags field specifies optional diagnostics that can also be collected.
DiagnosticComponents(ctx context.Context, additionalDiags []AdditionalMetrics, components ...DiagnosticComponentRequest) ([]DiagnosticComponentResult, error)
// Configure sends a new configuration to the Elastic Agent.
// Only works in the case that Elastic Agent is started in testing mode.
Configure(ctx context.Context, config string) error
}
// ClientStateWatch allows the state of the running Elastic Agent to be watched.
type ClientStateWatch interface {
// Recv receives the next agent state.
Recv() (*AgentState, error)
}
// Option is an option to adjust how the client operates.
type Option func(c *client)
// WithAddress adjust the connection address for the client.
func WithAddress(address string) Option {
return func(c *client) {
c.address = address
}
}
// WithMaxMsgSize adjures the GRPC connection maximum message size.
func WithMaxMsgSize(maxMsgSize int) Option {
return func(c *client) {
c.maxMsgSize = maxMsgSize
}
}
// client manages the state and communication to the Elastic Agent.
type client struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client cproto.ElasticAgentControlClient
address string
maxMsgSize int
}
// New creates a client connection to Elastic Agent.
func New(opts ...Option) Client {
cfg := configuration.DefaultGRPCConfig()
c := &client{
address: control.Address(),
maxMsgSize: cfg.MaxMsgSize,
}
for _, o := range opts {
o(c)
}
return c
}
// Connect connects to the running Elastic Agent.
func (c *client) Connect(ctx context.Context, opts ...grpc.DialOption) error {
c.ctx, c.cancel = context.WithCancel(ctx)
conn, err := dialContext(ctx, c.address, c.maxMsgSize, opts...)
if err != nil {
return err
}
c.client = cproto.NewElasticAgentControlClient(conn)
return nil
}
// Disconnect disconnects from the running Elastic Agent.
func (c *client) Disconnect() {
if c.cancel != nil {
c.cancel()
c.wg.Wait()
c.ctx = nil
c.cancel = nil
}
}
// Version returns the current version of the running agent.
func (c *client) Version(ctx context.Context) (Version, error) {
res, err := c.client.Version(ctx, &cproto.Empty{})
if err != nil {
return Version{}, err
}
bt, err := time.Parse(control.TimeFormat(), res.BuildTime)
if err != nil {
return Version{}, err
}
return Version{
Version: res.Version,
Commit: res.Commit,
BuildTime: bt,
Snapshot: res.Snapshot,
}, nil
}
// State returns the current state of the running agent.
func (c *client) State(ctx context.Context) (*AgentState, error) {
res, err := c.client.State(ctx, &cproto.Empty{})
if err != nil {
return nil, err
}
return toState(res)
}
// StateWatch watches the current state of the running agent.
func (c *client) StateWatch(ctx context.Context) (ClientStateWatch, error) {
cli, err := c.client.StateWatch(ctx, &cproto.Empty{})
if err != nil {
return nil, err
}
return &stateWatcher{cli}, nil
}
// Restart triggers restarting the current running daemon.
func (c *client) Restart(ctx context.Context) error {
res, err := c.client.Restart(ctx, &cproto.Empty{})
if err != nil {
return err
}
if res.Status == cproto.ActionStatus_FAILURE {
return errors.New(res.Error)
}
return nil
}
// Upgrade triggers upgrade of the current running daemon.
func (c *client) Upgrade(ctx context.Context, version string, sourceURI string, skipVerify bool, skipDefaultPgp bool, pgpBytes ...string) (string, error) {
res, err := c.client.Upgrade(ctx, &cproto.UpgradeRequest{
Version: version,
SourceURI: sourceURI,
SkipVerify: skipVerify,
PgpBytes: pgpBytes,
SkipDefaultPgp: skipDefaultPgp,
})
if err != nil {
return "", err
}
if res.Status == cproto.ActionStatus_FAILURE {
return "", errors.New(res.Error)
}
return res.Version, nil
}
// DiagnosticAgent gathers diagnostics information for the running Elastic Agent.
func (c *client) DiagnosticAgent(ctx context.Context, additionalMetrics []AdditionalMetrics) ([]DiagnosticFileResult, error) {
resp, err := c.client.DiagnosticAgent(ctx, &cproto.DiagnosticAgentRequest{AdditionalMetrics: additionalMetrics})
if err != nil {
return nil, fmt.Errorf("error in DiagnosticAgent RPC call: %w", err)
}
files := make([]DiagnosticFileResult, 0, len(resp.Results))
for _, f := range resp.Results {
files = append(files, DiagnosticFileResult{
Name: f.Name,
Filename: f.Filename,
Description: f.Description,
ContentType: f.ContentType,
Content: f.Content,
Generated: f.Generated.AsTime(),
})
}
return files, nil
}
// DiagnosticComponents gathers diagnostic information for components running under elastic-agent
// errors at the DiagnosticComponents() level are returned as an error value, errors at the level of individual components are returned in
// the DiagnosticComponentResult struct.
func (c *client) DiagnosticComponents(ctx context.Context, additionalMetrics []AdditionalMetrics, components ...DiagnosticComponentRequest) ([]DiagnosticComponentResult, error) {
reqs := make([]*cproto.DiagnosticComponentRequest, 0, len(components))
for _, u := range components {
reqs = append(reqs, &cproto.DiagnosticComponentRequest{
ComponentId: u.ComponentID,
})
}
respStream, err := c.client.DiagnosticComponents(ctx, &cproto.DiagnosticComponentsRequest{AdditionalMetrics: additionalMetrics, Components: reqs})
if err != nil {
return nil, fmt.Errorf("error in DiagnosticComponents RPC call: %w", err)
}
results := []DiagnosticComponentResult{}
for {
compResp, err := respStream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, fmt.Errorf("error reading response stream: %w", err)
}
resultFiles := []DiagnosticFileResult{}
for _, file := range compResp.Results {
resultFiles = append(resultFiles, DiagnosticFileResult{
Name: file.Name,
Filename: file.Filename,
Description: file.Description,
ContentType: file.ContentType,
Content: file.Content,
Generated: file.Generated.AsTime(),
})
}
if compResp.Error != "" {
err = errors.New(compResp.Error)
}
results = append(results, DiagnosticComponentResult{
ComponentID: compResp.ComponentId,
Err: err,
Results: resultFiles,
})
}
return results, nil
}
// DiagnosticUnits gathers diagnostics information from specific units (or all if non are provided).
func (c *client) DiagnosticUnits(ctx context.Context, units ...DiagnosticUnitRequest) ([]DiagnosticUnitResult, error) {
reqs := make([]*cproto.DiagnosticUnitRequest, 0, len(units))
for _, u := range units {
reqs = append(reqs, &cproto.DiagnosticUnitRequest{
ComponentId: u.ComponentID,
UnitType: u.UnitType,
UnitId: u.UnitID,
})
}
respStream, err := c.client.DiagnosticUnits(ctx, &cproto.DiagnosticUnitsRequest{Units: reqs})
if err != nil {
return nil, err
}
results := make([]DiagnosticUnitResult, 0)
for {
var u *cproto.DiagnosticUnitResponse
u, err = respStream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, fmt.Errorf("failed to retrieve unit diagnostics: %w", err)
}
files := make([]DiagnosticFileResult, 0, len(u.Results))
for _, f := range u.Results {
files = append(files, DiagnosticFileResult{
Name: f.Name,
Filename: f.Filename,
Description: f.Description,
ContentType: f.ContentType,
Content: f.Content,
Generated: f.Generated.AsTime(),
})
}
var err error
if u.Error != "" {
err = errors.New(u.Error)
}
results = append(results, DiagnosticUnitResult{
ComponentID: u.ComponentId,
UnitID: u.UnitId,
UnitType: u.UnitType,
Err: err,
Results: files,
})
}
return results, nil
}
// Configure sends a new configuration to the Elastic Agent.
//
// Only works in the case that Elastic Agent is started in testing mode.
func (c *client) Configure(ctx context.Context, config string) error {
_, err := c.client.Configure(ctx, &cproto.ConfigureRequest{Config: config})
return err
}
type stateWatcher struct {
client cproto.ElasticAgentControl_StateWatchClient
}
// Recv receives the next agent state.
func (sw *stateWatcher) Recv() (*AgentState, error) {
resp, err := sw.client.Recv()
if err != nil {
return nil, err
}
return toState(resp)
}
func toState(res *cproto.StateResponse) (*AgentState, error) {
s := &AgentState{
Info: AgentStateInfo{
ID: res.Info.Id,
Version: res.Info.Version,
Commit: res.Info.Commit,
BuildTime: res.Info.BuildTime,
Snapshot: res.Info.Snapshot,
PID: res.Info.Pid,
Unprivileged: res.Info.Unprivileged,
IsManaged: res.Info.IsManaged,
},
State: res.State,
Message: res.Message,
FleetState: res.FleetState,
FleetMessage: res.FleetMessage,
UpgradeDetails: res.UpgradeDetails,
Components: make([]ComponentState, 0, len(res.Components)),
}
for _, comp := range res.Components {
units := make([]ComponentUnitState, 0, len(comp.Units))
for _, unit := range comp.Units {
var payload map[string]interface{}
if unit.Payload != "" {
err := json.Unmarshal([]byte(unit.Payload), &payload)
if err != nil {
return nil, err
}
}
units = append(units, ComponentUnitState{
UnitID: unit.UnitId,
UnitType: unit.UnitType,
State: unit.State,
Message: unit.Message,
Payload: payload,
})
}
cs := ComponentState{
ID: comp.Id,
Name: comp.Name,
State: comp.State,
Message: comp.Message,
Units: units,
}
if comp.VersionInfo != nil {
cs.VersionInfo = ComponentVersionInfo{
Name: comp.VersionInfo.Name,
Meta: comp.VersionInfo.Meta,
}
}
s.Components = append(s.Components, cs)
}
if res.Collector != nil {
cs, err := collectorToState(res.Collector)
if err != nil {
return nil, err
}
s.Collector = cs
}
return s, nil
}
func collectorToState(res *cproto.CollectorComponent) (*CollectorComponent, error) {
var t time.Time
var err error
if res.Timestamp != "" {
t, err = time.Parse(time.RFC3339Nano, res.Timestamp)
if err != nil {
return nil, err
}
}
cc := &CollectorComponent{
Status: res.Status,
Error: res.Error,
Timestamp: t,
}
if res.ComponentStatusMap != nil {
cc.ComponentStatusMap = make(map[string]*CollectorComponent, len(res.ComponentStatusMap))
for id, compStatus := range res.ComponentStatusMap {
cs, err := collectorToState(compStatus)
if err != nil {
return nil, err
}
cc.ComponentStatusMap[id] = cs
}
}
return cc, nil
}