pkg/control/v2/server/server.go (388 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package server
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"time"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/component/componentstatus"
"github.com/elastic/elastic-agent/pkg/control"
"github.com/elastic/elastic-agent/pkg/control/v1/proto"
v1server "github.com/elastic/elastic-agent/pkg/control/v1/server"
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
"go.elastic.co/apm/module/apmgrpc/v2"
"go.elastic.co/apm/v2"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
// TestModeConfigSetter is used only for testing mode.
type TestModeConfigSetter interface {
// SetConfig sets the configuration.
SetConfig(ctx context.Context, cfg string) error
}
// Server is the daemon side of the control protocol.
type Server struct {
cproto.UnimplementedElasticAgentControlServer
logger *logger.Logger
agentInfo info.Agent
coord *coordinator.Coordinator
listener net.Listener
server *grpc.Server
tracer *apm.Tracer
diagHooks diagnostics.Hooks
grpcConfig *configuration.GRPCConfig
tmSetter TestModeConfigSetter
}
// New creates a new control protocol server.
func New(log *logger.Logger, agentInfo info.Agent, coord *coordinator.Coordinator, tracer *apm.Tracer, diagHooks diagnostics.Hooks, grpcConfig *configuration.GRPCConfig) *Server {
return &Server{
logger: log,
agentInfo: agentInfo,
coord: coord,
tracer: tracer,
diagHooks: diagHooks,
grpcConfig: grpcConfig,
}
}
// SetTestModeConfigSetter sets the test mode configuration setter.
func (s *Server) SetTestModeConfigSetter(setter TestModeConfigSetter) {
s.tmSetter = setter
}
// Start starts the GRPC endpoint and accepts new connections.
func (s *Server) Start() error {
if s.server != nil {
// already started
return nil
}
lis, err := createListener(s.logger)
if err != nil {
s.logger.Errorf("unable to create listener: %s", err)
return err
}
s.logger.With("address", control.Address()).Infof("GRPC control socket listening at %s", control.Address())
s.listener = lis
if s.tracer != nil {
apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(s.tracer))
s.server = grpc.NewServer(grpc.UnaryInterceptor(apmInterceptor), grpc.MaxRecvMsgSize(s.grpcConfig.MaxMsgSize))
} else {
s.server = grpc.NewServer(grpc.MaxRecvMsgSize(s.grpcConfig.MaxMsgSize))
}
cproto.RegisterElasticAgentControlServer(s.server, s)
v1Wrapper := v1server.New(s.logger, s, s.tracer)
proto.RegisterElasticAgentControlServer(s.server, v1Wrapper)
// start serving GRPC connections
go func() {
err := s.server.Serve(lis)
if err != nil {
s.logger.Errorf("error listening for GRPC: %s", err)
}
}()
return nil
}
// Stop stops the GRPC endpoint.
func (s *Server) Stop() {
if s.server != nil {
s.server.Stop()
s.server = nil
s.listener = nil
cleanupListener(s.logger)
}
}
// Version returns the currently running version.
func (s *Server) Version(_ context.Context, _ *cproto.Empty) (*cproto.VersionResponse, error) {
return &cproto.VersionResponse{
Version: release.Version(),
Commit: release.Commit(),
BuildTime: release.BuildTime().Format(control.TimeFormat()),
Snapshot: release.Snapshot(),
Fips: release.FIPSDistribution(),
}, nil
}
// State returns the overall state of the agent.
func (s *Server) State(_ context.Context, _ *cproto.Empty) (*cproto.StateResponse, error) {
state := s.coord.State()
return stateToProto(&state, s.agentInfo)
}
// StateWatch streams the current state of the Elastic Agent to the client.
func (s *Server) StateWatch(_ *cproto.Empty, srv cproto.ElasticAgentControl_StateWatchServer) error {
ctx := srv.Context()
// TODO: Should we expose the subscription buffer size in the RPC? This
// would e.g. let subscribers who only care about the latest state set a
// buffer size of 0 so they will always receive the most recent value
// instead of the full sequence.
subChan := s.coord.StateSubscribe(ctx, 32)
for {
select {
case <-ctx.Done():
return ctx.Err()
case state := <-subChan:
resp, err := stateToProto(&state, s.agentInfo)
if err != nil {
return err
}
err = srv.Send(resp)
if err != nil {
return err
}
}
}
}
// Restart performs re-exec.
func (s *Server) Restart(_ context.Context, _ *cproto.Empty) (*cproto.RestartResponse, error) {
s.coord.ReExec(nil)
return &cproto.RestartResponse{
Status: cproto.ActionStatus_SUCCESS,
}, nil
}
// Upgrade performs the upgrade operation.
func (s *Server) Upgrade(ctx context.Context, request *cproto.UpgradeRequest) (*cproto.UpgradeResponse, error) {
err := s.coord.Upgrade(ctx, request.Version, request.SourceURI, nil, request.SkipVerify, request.SkipDefaultPgp, request.PgpBytes...)
if err != nil {
//nolint:nilerr // ignore the error, return a failure upgrade response
return &cproto.UpgradeResponse{
Status: cproto.ActionStatus_FAILURE,
Error: err.Error(),
}, nil
}
return &cproto.UpgradeResponse{
Status: cproto.ActionStatus_SUCCESS,
Version: request.Version,
}, nil
}
// DiagnosticAgent returns diagnostic information for this running Elastic Agent.
func (s *Server) DiagnosticAgent(ctx context.Context, req *cproto.DiagnosticAgentRequest) (*cproto.DiagnosticAgentResponse, error) {
res := make([]*cproto.DiagnosticFileResult, 0, len(s.diagHooks))
for _, h := range s.diagHooks {
if ctx.Err() != nil {
return nil, ctx.Err()
}
r := h.Hook(ctx)
res = append(res, &cproto.DiagnosticFileResult{
Name: h.Name,
Filename: h.Filename,
Description: h.Description,
ContentType: h.ContentType,
Content: r,
Generated: timestamppb.New(time.Now().UTC()),
})
}
for _, metric := range req.AdditionalMetrics {
switch metric {
case cproto.AdditionalDiagnosticRequest_CPU:
duration := diagnostics.DiagCPUDuration
s.logger.Infof("Collecting CPU metrics, waiting for %s", duration)
cpuResults, err := diagnostics.CreateCPUProfile(ctx, duration)
if err != nil {
return nil, fmt.Errorf("error gathering CPU profile: %w", err)
}
res = append(res, &cproto.DiagnosticFileResult{
Name: diagnostics.DiagCPUName,
Filename: diagnostics.DiagCPUFilename,
Description: diagnostics.DiagCPUDescription,
ContentType: diagnostics.DiagCPUContentType,
Content: cpuResults,
Generated: timestamppb.New(time.Now().UTC()),
})
}
}
if ctx.Err() != nil {
return nil, ctx.Err()
}
return &cproto.DiagnosticAgentResponse{Results: res}, nil
}
// DiagnosticComponents returns diagnostic information for the given components
func (s *Server) DiagnosticComponents(req *cproto.DiagnosticComponentsRequest, respServ cproto.ElasticAgentControl_DiagnosticComponentsServer) error {
reqs := []component.Component{}
for _, comp := range req.Components {
reqs = append(reqs, component.Component{ID: comp.GetComponentId()})
}
diags, err := s.coord.PerformComponentDiagnostics(respServ.Context(), req.AdditionalMetrics, reqs...)
if err != nil {
return fmt.Errorf("error fetching component-level diagnostics: %w", err)
}
for _, diag := range diags {
respFiles := []*cproto.DiagnosticFileResult{}
for _, file := range diag.Results {
respFiles = append(respFiles, &cproto.DiagnosticFileResult{
Name: file.Name,
Filename: file.Filename,
Description: file.Description,
ContentType: file.ContentType,
Content: file.Content,
Generated: file.Generated,
})
}
respStruct := &cproto.DiagnosticComponentResponse{
ComponentId: diag.Component.ID,
Results: respFiles,
}
if diag.Err != nil {
respStruct.Error = diag.Err.Error()
}
err := respServ.Send(respStruct)
if err != nil {
return fmt.Errorf("error sending response: %w", err)
}
}
return nil
}
// DiagnosticUnits returns diagnostic information for the specific units (or all units if non-provided).
func (s *Server) DiagnosticUnits(req *cproto.DiagnosticUnitsRequest, srv cproto.ElasticAgentControl_DiagnosticUnitsServer) error {
reqs := make([]runtime.ComponentUnitDiagnosticRequest, 0, len(req.Units))
for _, u := range req.Units {
reqs = append(reqs, runtime.ComponentUnitDiagnosticRequest{
Component: component.Component{
ID: u.ComponentId,
},
Unit: component.Unit{
ID: u.UnitId,
Type: client.UnitType(u.UnitType),
},
})
}
diag := s.coord.PerformDiagnostics(srv.Context(), reqs...)
for _, d := range diag {
r := &cproto.DiagnosticUnitResponse{
ComponentId: d.Component.ID,
UnitType: cproto.UnitType(d.Unit.Type),
UnitId: d.Unit.ID,
Error: "",
Results: nil,
}
if d.Err != nil {
r.Error = d.Err.Error()
} else {
results := make([]*cproto.DiagnosticFileResult, 0, len(d.Results))
for _, fr := range d.Results {
results = append(results, &cproto.DiagnosticFileResult{
Name: fr.Name,
Filename: fr.Filename,
Description: fr.Description,
ContentType: fr.ContentType,
Content: fr.Content,
Generated: fr.Generated,
})
}
r.Results = results
}
if err := srv.Send(r); err != nil {
return err
}
}
return nil
}
// Configure configures the running Elastic Agent configuration.
//
// Only available in testing mode.
func (s *Server) Configure(ctx context.Context, req *cproto.ConfigureRequest) (*cproto.Empty, error) {
if s.tmSetter == nil {
return nil, errors.New("testing mode is not enabled")
}
err := s.tmSetter.SetConfig(ctx, req.Config)
if err != nil {
return nil, err
}
return &cproto.Empty{}, nil
}
func stateToProto(state *coordinator.State, agentInfo info.Agent) (*cproto.StateResponse, error) {
var err error
components := make([]*cproto.ComponentState, 0, len(state.Components))
for _, comp := range state.Components {
units := make([]*cproto.ComponentUnitState, 0, len(comp.State.Units))
for key, unit := range comp.State.Units {
payload := []byte("")
if unit.Payload != nil {
payload, err = json.Marshal(unit.Payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal component %s unit %s payload: %w", comp.Component.ID, key.UnitID, err)
}
}
units = append(units, &cproto.ComponentUnitState{
UnitType: cproto.UnitType(key.UnitType),
UnitId: key.UnitID,
State: cproto.State(unit.State),
Message: unit.Message,
Payload: string(payload),
})
}
components = append(components, &cproto.ComponentState{
Id: comp.Component.ID,
Name: comp.Component.Type(),
State: cproto.State(comp.State.State),
Message: comp.State.Message,
Units: units,
VersionInfo: &cproto.ComponentVersionInfo{
Name: comp.State.VersionInfo.Name,
Meta: comp.State.VersionInfo.Meta,
},
})
}
var upgradeDetails *cproto.UpgradeDetails
if state.UpgradeDetails != nil {
upgradeDetails = &cproto.UpgradeDetails{
TargetVersion: state.UpgradeDetails.TargetVersion,
State: string(state.UpgradeDetails.State),
ActionId: state.UpgradeDetails.ActionID,
Metadata: &cproto.UpgradeDetailsMetadata{
DownloadPercent: float32(state.UpgradeDetails.Metadata.DownloadPercent),
FailedState: string(state.UpgradeDetails.Metadata.FailedState),
ErrorMsg: state.UpgradeDetails.Metadata.ErrorMsg,
RetryErrorMsg: state.UpgradeDetails.Metadata.RetryErrorMsg,
},
}
if state.UpgradeDetails.Metadata.ScheduledAt != nil &&
!state.UpgradeDetails.Metadata.ScheduledAt.IsZero() {
upgradeDetails.Metadata.ScheduledAt = state.UpgradeDetails.Metadata.ScheduledAt.Format(control.TimeFormat())
}
if state.UpgradeDetails.Metadata.RetryUntil != nil &&
!state.UpgradeDetails.Metadata.RetryUntil.IsZero() {
upgradeDetails.Metadata.RetryUntil = state.UpgradeDetails.Metadata.RetryUntil.Format(control.TimeFormat())
}
}
return &cproto.StateResponse{
Info: &cproto.StateAgentInfo{
Id: agentInfo.AgentID(),
Version: release.Version(),
Commit: release.Commit(),
BuildTime: release.BuildTime().Format(control.TimeFormat()),
Snapshot: release.Snapshot(),
Pid: int32(os.Getpid()), //nolint:gosec // not going to have a pid greater than 32bit integer
Unprivileged: agentInfo.Unprivileged(),
IsManaged: !agentInfo.IsStandalone(),
},
State: state.State,
Message: state.Message,
FleetState: state.FleetState,
FleetMessage: state.FleetMessage,
Components: components,
UpgradeDetails: upgradeDetails,
Collector: collectorToProto(state.Collector),
}, nil
}
func collectorToProto(s *status.AggregateStatus) *cproto.CollectorComponent {
if s == nil {
return nil
}
r := &cproto.CollectorComponent{
Status: otelComponentStatusToProto(s.Status()),
Timestamp: s.Timestamp().Format(time.RFC3339Nano),
}
if s.Err() != nil {
r.Error = s.Err().Error()
}
if len(s.ComponentStatusMap) > 0 {
r.ComponentStatusMap = make(map[string]*cproto.CollectorComponent, len(s.ComponentStatusMap))
for id, nested := range s.ComponentStatusMap {
r.ComponentStatusMap[id] = collectorToProto(nested)
}
}
return r
}
func otelComponentStatusToProto(s componentstatus.Status) cproto.CollectorComponentStatus {
switch s {
case componentstatus.StatusNone:
return cproto.CollectorComponentStatus_StatusNone
case componentstatus.StatusStarting:
return cproto.CollectorComponentStatus_StatusStarting
case componentstatus.StatusOK:
return cproto.CollectorComponentStatus_StatusOK
case componentstatus.StatusRecoverableError:
return cproto.CollectorComponentStatus_StatusRecoverableError
case componentstatus.StatusPermanentError:
return cproto.CollectorComponentStatus_StatusPermanentError
case componentstatus.StatusFatalError:
return cproto.CollectorComponentStatus_StatusFatalError
case componentstatus.StatusStopping:
return cproto.CollectorComponentStatus_StatusStopping
case componentstatus.StatusStopped:
return cproto.CollectorComponentStatus_StatusStopped
}
return cproto.CollectorComponentStatus_StatusNone
}