cmd/opampsupervisor/supervisor/supervisor.go (1,392 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package supervisor
import (
"bufio"
"bytes"
"context"
"crypto/tls"
_ "embed"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"text/template"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/knadh/koanf/maps"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/knadh/koanf/v2"
"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server"
serverTypes "github.com/open-telemetry/opamp-go/server/types"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.opentelemetry.io/contrib/bridges/otelzap"
telemetryconfig "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/commander"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/healthchecker"
)
var (
//go:embed templates/nooppipeline.yaml
noopPipelineTpl string
//go:embed templates/extraconfig.yaml
extraConfigTpl string
//go:embed templates/opampextension.yaml
opampextensionTpl string
//go:embed templates/owntelemetry.yaml
ownTelemetryTpl string
lastRecvRemoteConfigFile = "last_recv_remote_config.dat"
lastRecvOwnTelemetryConfigFile = "last_recv_own_telemetry_config.dat"
errNonMatchingInstanceUID = errors.New("received collector instance UID does not match expected UID set by the supervisor")
)
const (
persistentStateFileName = "persistent_state.yaml"
agentConfigFileName = "effective.yaml"
AllowNoPipelinesFeatureGate = "service.AllowNoPipelines"
)
const maxBufferedCustomMessages = 10
type configState struct {
// Supervisor-assembled config to be given to the Collector.
mergedConfig string
// true if the server provided configmap was empty
configMapIsEmpty bool
}
func (c *configState) equal(other *configState) bool {
return other.mergedConfig == c.mergedConfig && other.configMapIsEmpty == c.configMapIsEmpty
}
type agentStartStatus string
var (
agentStarting agentStartStatus = "starting"
agentNotStarting agentStartStatus = "notStarting"
)
type telemetrySettings struct {
component.TelemetrySettings
loggerProvider log.LoggerProvider
}
// Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient
// to work with an OpAMP Server.
type Supervisor struct {
pidProvider pidProvider
// Commander that starts/stops the Agent process.
commander *commander.Commander
startedAt time.Time
healthCheckTicker *backoff.Ticker
healthChecker *healthchecker.HTTPHealthChecker
lastHealthCheckErr error
// Supervisor's own config.
config config.Supervisor
agentDescription *atomic.Value
availableComponents *atomic.Value
// Supervisor's persistent state
persistentState *persistentState
noopPipelineTemplate *template.Template
opampextensionTemplate *template.Template
extraConfigTemplate *template.Template
ownTelemetryTemplate *template.Template
agentConn *atomic.Value
// A config section to be added to the Collector's config to fetch its own metrics.
// TODO: store this persistently so that when starting we can compose the effective
// config correctly.
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21078
agentConfigOwnMetricsSection *atomic.Value
// agentHealthCheckEndpoint is the endpoint the Collector's health check extension
// will listen on for health check requests from the Supervisor.
agentHealthCheckEndpoint string
// Internal config state for agent use. See the configState struct for more details.
cfgState *atomic.Value
// Final effective config of the Collector.
effectiveConfig *atomic.Value
// Last received remote config.
remoteConfig *protobufs.AgentRemoteConfig
// A channel to indicate there is a new config to apply.
hasNewConfig chan struct{}
// configApplyTimeout is the maximum time to wait for the agent to apply a new config.
// After this time passes without the agent reporting health as OK, the agent is considered unhealthy.
configApplyTimeout time.Duration
// lastHealthFromClient is the last health status of the agent received from the client.
lastHealthFromClient *protobufs.ComponentHealth
// lastHealth is the last health status of the agent.
lastHealth *protobufs.ComponentHealth
// The OpAMP client to connect to the OpAMP Server.
opampClient client.OpAMPClient
doneChan chan struct{}
agentWG sync.WaitGroup
customMessageToServer chan *protobufs.CustomMessage
customMessageWG sync.WaitGroup
// agentHasStarted is true if the agent has started.
agentHasStarted bool
// agentStartHealthCheckAttempts is the number of health check attempts made by the agent since it started.
agentStartHealthCheckAttempts int
// agentRestarting is true if the agent is restarting.
agentRestarting atomic.Bool
// The OpAMP server to communicate with the Collector's OpAMP extension
opampServer server.OpAMPServer
opampServerPort int
telemetrySettings telemetrySettings
featureGates map[string]struct{}
}
func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, error) {
s := &Supervisor{
pidProvider: defaultPIDProvider{},
hasNewConfig: make(chan struct{}, 1),
agentConfigOwnMetricsSection: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentDescription: &atomic.Value{},
availableComponents: &atomic.Value{},
doneChan: make(chan struct{}),
customMessageToServer: make(chan *protobufs.CustomMessage, maxBufferedCustomMessages),
agentConn: &atomic.Value{},
featureGates: map[string]struct{}{},
}
if err := s.createTemplates(); err != nil {
return nil, err
}
telSettings, err := initTelemetrySettings(logger, cfg.Telemetry)
if err != nil {
return nil, err
}
s.telemetrySettings = telSettings
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("error validating config: %w", err)
}
s.config = cfg
if err := os.MkdirAll(s.config.Storage.Directory, 0o700); err != nil {
return nil, fmt.Errorf("error creating storage dir: %w", err)
}
s.configApplyTimeout = s.config.Agent.ConfigApplyTimeout
return s, nil
}
func initTelemetrySettings(logger *zap.Logger, cfg config.Telemetry) (telemetrySettings, error) {
readers := cfg.Metrics.Readers
if cfg.Metrics.Level == configtelemetry.LevelNone {
readers = []telemetryconfig.MetricReader{}
}
pcommonRes := pcommon.NewResource()
for k, v := range cfg.Resource {
pcommonRes.Attributes().PutStr(k, *v)
}
if _, ok := cfg.Resource[semconv.AttributeServiceName]; !ok {
pcommonRes.Attributes().PutStr(semconv.AttributeServiceName, "opamp-supervisor")
}
if _, ok := cfg.Resource[semconv.AttributeServiceInstanceID]; !ok {
instanceUUID, _ := uuid.NewRandom()
instanceID := instanceUUID.String()
pcommonRes.Attributes().PutStr(semconv.AttributeServiceInstanceID, instanceID)
}
// TODO currently we do not have the build info containing the version available to set semconv.AttributeServiceVersion
var attrs []telemetryconfig.AttributeNameValue
for k, v := range pcommonRes.Attributes().All() {
attrs = append(attrs, telemetryconfig.AttributeNameValue{Name: k, Value: v.Str()})
}
sch := semconv.SchemaURL
ctx := context.Background()
sdk, err := telemetryconfig.NewSDK(
telemetryconfig.WithContext(ctx),
telemetryconfig.WithOpenTelemetryConfiguration(
telemetryconfig.OpenTelemetryConfiguration{
MeterProvider: &telemetryconfig.MeterProvider{
Readers: readers,
},
TracerProvider: &telemetryconfig.TracerProvider{
Processors: cfg.Traces.Processors,
},
LoggerProvider: &telemetryconfig.LoggerProvider{
Processors: cfg.Logs.Processors,
},
Resource: &telemetryconfig.Resource{
SchemaUrl: &sch,
Attributes: attrs,
},
},
),
)
if err != nil {
return telemetrySettings{}, err
}
var lp log.LoggerProvider
if len(cfg.Logs.Processors) > 0 {
lp = sdk.LoggerProvider()
logger = logger.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
core, err := zapcore.NewIncreaseLevelCore(zapcore.NewTee(
c,
otelzap.NewCore("github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor",
otelzap.WithLoggerProvider(lp),
),
), zap.NewAtomicLevelAt(cfg.Logs.Level))
if err != nil {
panic(err)
}
return core
}))
}
return telemetrySettings{
component.TelemetrySettings{
Logger: logger,
TracerProvider: sdk.TracerProvider(),
MeterProvider: sdk.MeterProvider(),
Resource: pcommonRes,
},
lp,
}, nil
}
func (s *Supervisor) Start() error {
var err error
s.persistentState, err = loadOrCreatePersistentState(s.persistentStateFilePath())
if err != nil {
return err
}
if err = s.getFeatureGates(); err != nil {
return fmt.Errorf("could not get feature gates from the Collector: %w", err)
}
if err = s.getBootstrapInfo(); err != nil {
return fmt.Errorf("could not get bootstrap info from the Collector: %w", err)
}
healthCheckPort := s.config.Agent.HealthCheckPort
if healthCheckPort == 0 {
healthCheckPort, err = s.findRandomPort()
if err != nil {
return fmt.Errorf("could not find port for health check: %w", err)
}
}
s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", healthCheckPort)
s.telemetrySettings.Logger.Info("Supervisor starting",
zap.String("id", s.persistentState.InstanceID.String()))
err = s.loadAndWriteInitialMergedConfig()
if err != nil {
return fmt.Errorf("failed loading initial config: %w", err)
}
if err = s.startOpAMP(); err != nil {
return fmt.Errorf("cannot start OpAMP client: %w", err)
}
s.commander, err = commander.NewCommander(
s.telemetrySettings.Logger,
s.config.Storage.Directory,
s.config.Agent,
"--config", s.agentConfigFilePath(),
)
if err != nil {
return err
}
s.startHealthCheckTicker()
s.agentWG.Add(1)
go func() {
defer s.agentWG.Done()
s.runAgentProcess()
}()
s.customMessageWG.Add(1)
go func() {
defer s.customMessageWG.Done()
s.forwardCustomMessagesToServerLoop()
}()
return nil
}
func (s *Supervisor) getFeatureGates() error {
cmd, err := commander.NewCommander(
s.telemetrySettings.Logger,
s.config.Storage.Directory,
s.config.Agent,
"featuregate",
)
if err != nil {
return err
}
stdout, _, err := cmd.StartOneShot()
if err != nil {
return err
}
scanner := bufio.NewScanner(bytes.NewBuffer(stdout))
// First line only contains headers, discard it.
_ = scanner.Scan()
for scanner.Scan() {
line := scanner.Text()
i := strings.Index(line, " ")
flag := line[0:i]
if flag == AllowNoPipelinesFeatureGate {
s.featureGates[AllowNoPipelinesFeatureGate] = struct{}{}
}
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading standard input:", err)
}
return nil
}
func (s *Supervisor) createTemplates() error {
var err error
if s.noopPipelineTemplate, err = template.New("nooppipeline").Parse(noopPipelineTpl); err != nil {
return err
}
if s.extraConfigTemplate, err = template.New("extraconfig").Parse(extraConfigTpl); err != nil {
return err
}
if s.opampextensionTemplate, err = template.New("opampextension").Parse(opampextensionTpl); err != nil {
return err
}
if s.ownTelemetryTemplate, err = template.New("owntelemetry").Parse(ownTelemetryTpl); err != nil {
return err
}
return nil
}
// getBootstrapInfo obtains the Collector's agent description by
// starting a Collector with a specific config that only starts
// an OpAMP extension, obtains the agent description, then
// shuts down the Collector. This only needs to happen
// once per Collector binary.
func (s *Supervisor) getBootstrapInfo() (err error) {
_, span := s.getTracer().Start(context.Background(), "GetBootstrapInfo")
defer span.End()
s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not get supervisor opamp service port: %v", err))
return err
}
bootstrapConfig, err := s.composeNoopConfig()
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not compose noop config config: %v", err))
return err
}
err = os.WriteFile(s.agentConfigFilePath(), bootstrapConfig, 0o600)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to write agent config: %v", err))
return fmt.Errorf("failed to write agent config: %w", err)
}
srv := server.New(newLoggerFromZap(s.telemetrySettings.Logger, "opamp-server"))
done := make(chan error, 1)
var connected atomic.Bool
var doneReported atomic.Bool
// Start a one-shot server to get the Collector's agent description
// and available components using the Collector's OpAMP extension.
err = srv.Start(flattenedSettings{
endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
onConnecting: func(_ *http.Request) (bool, int) {
connected.Store(true)
return true, http.StatusOK
},
onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
response := &protobufs.ServerToAgent{}
if message.GetAvailableComponents() != nil {
s.setAvailableComponents(message.AvailableComponents)
}
if message.AgentDescription != nil {
instanceIDSeen := false
s.setAgentDescription(message.AgentDescription)
identAttr := message.AgentDescription.IdentifyingAttributes
for _, attr := range identAttr {
if attr.Key == semconv.AttributeServiceInstanceID {
if attr.Value.GetStringValue() != s.persistentState.InstanceID.String() {
done <- fmt.Errorf(
"the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s): %w",
attr.Value.GetStringValue(),
s.persistentState.InstanceID.String(),
errNonMatchingInstanceUID,
)
return response
}
instanceIDSeen = true
}
}
if !instanceIDSeen {
done <- errors.New("the Collector did not specify an instance ID in its AgentDescription message")
return response
}
}
// agent description must be defined
_, ok := s.agentDescription.Load().(*protobufs.AgentDescription)
if !ok {
return response
}
// if available components have not been reported, agent description is sufficient to continue
availableComponents, availableComponentsOk := s.availableComponents.Load().(*protobufs.AvailableComponents)
if availableComponentsOk {
// must have a full list of components if available components have been reported
if availableComponents.GetComponents() != nil {
if !doneReported.Load() {
done <- nil
doneReported.Store(true)
}
} else {
// if we don't have a full component list, ask for it
response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
}
return response
}
// need to only report done once, not on each message - otherwise, we get a hung thread
if !doneReported.Load() {
done <- nil
doneReported.Store(true)
}
return response
},
}.toServerSettings())
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not start OpAMP server: %v", err))
return err
}
defer func() {
if stopErr := srv.Stop(context.Background()); stopErr != nil {
err = errors.Join(err, fmt.Errorf("error when stopping the opamp server: %w", stopErr))
}
}()
flags := []string{
"--config", s.agentConfigFilePath(),
}
featuregateFlag := s.getFeatureGateFlag()
if len(featuregateFlag) > 0 {
flags = append(flags, featuregateFlag...)
}
cmd, err := commander.NewCommander(
s.telemetrySettings.Logger,
s.config.Storage.Directory,
s.config.Agent,
flags...,
)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not start Agent: %v", err))
return err
}
if err = cmd.Start(context.Background()); err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not start Agent: %v", err))
return err
}
defer func() {
if stopErr := cmd.Stop(context.Background()); stopErr != nil {
err = errors.Join(err, fmt.Errorf("error when stopping the collector: %w", stopErr))
}
}()
select {
case <-time.After(s.config.Agent.BootstrapTimeout):
if connected.Load() {
msg := "collector connected but never responded with an AgentDescription message"
span.SetStatus(codes.Error, msg)
return errors.New(msg)
} else {
msg := "collector's OpAMP client never connected to the Supervisor"
span.SetStatus(codes.Error, msg)
return errors.New(msg)
}
case err = <-done:
if errors.Is(err, errNonMatchingInstanceUID) {
// try to report the issue to the OpAMP server
if startOpAMPErr := s.startOpAMPClient(); startOpAMPErr == nil {
defer func(s *Supervisor) {
if stopErr := s.stopOpAMPClient(); stopErr != nil {
s.telemetrySettings.Logger.Error("Could not stop OpAmp client", zap.Error(stopErr))
}
}(s)
if healthErr := s.opampClient.SetHealth(&protobufs.ComponentHealth{
Healthy: false,
LastError: err.Error(),
}); healthErr != nil {
s.telemetrySettings.Logger.Error("Could not report health to OpAMP server", zap.Error(healthErr))
}
} else {
s.telemetrySettings.Logger.Error("Could not start OpAMP client to report health to server", zap.Error(startOpAMPErr))
}
}
if err != nil {
s.telemetrySettings.Logger.Error("Could not complete bootstrap", zap.Error(err))
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
return err
}
}
func (s *Supervisor) startOpAMP() error {
if err := s.startOpAMPClient(); err != nil {
return err
}
if err := s.startOpAMPServer(); err != nil {
return err
}
return nil
}
func (s *Supervisor) startOpAMPClient() error {
// determine if we need to load a TLS config or not
var tlsConfig *tls.Config
parsedURL, err := url.Parse(s.config.Server.Endpoint)
if err != nil {
return fmt.Errorf("parse server endpoint: %w", err)
}
if parsedURL.Scheme == "wss" || parsedURL.Scheme == "https" {
tlsConfig, err = s.config.Server.TLSSetting.LoadTLSConfig(context.Background())
if err != nil {
return err
}
}
logger := newLoggerFromZap(s.telemetrySettings.Logger, "opamp-client")
switch parsedURL.Scheme {
case "ws", "wss":
s.opampClient = client.NewWebSocket(logger)
case "http", "https":
s.opampClient = client.NewHTTP(logger)
default:
return fmt.Errorf("unsupported scheme in server endpoint: %q", parsedURL.Scheme)
}
s.telemetrySettings.Logger.Debug("Connecting to OpAMP server...", zap.String("endpoint", s.config.Server.Endpoint), zap.Any("headers", s.config.Server.Headers))
settings := types.StartSettings{
OpAMPServerURL: s.config.Server.Endpoint,
Header: s.config.Server.Headers,
TLSConfig: tlsConfig,
InstanceUid: types.InstanceUid(s.persistentState.InstanceID),
Callbacks: types.Callbacks{
OnConnect: func(_ context.Context) {
s.telemetrySettings.Logger.Debug("Connected to the server.")
},
OnConnectFailed: func(_ context.Context, err error) {
s.telemetrySettings.Logger.Error("Failed to connect to the server", zap.Error(err))
},
OnError: func(_ context.Context, err *protobufs.ServerErrorResponse) {
s.telemetrySettings.Logger.Error("Server returned an error response", zap.String("message", err.ErrorMessage))
},
OnMessage: s.onMessage,
OnOpampConnectionSettings: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error {
//nolint:errcheck
go s.onOpampConnectionSettings(ctx, settings)
return nil
},
OnCommand: func(_ context.Context, command *protobufs.ServerToAgentCommand) error {
cmdType := command.GetType()
if *cmdType.Enum() == protobufs.CommandType_CommandType_Restart {
return s.handleRestartCommand()
}
return nil
},
SaveRemoteConfigStatus: func(_ context.Context, _ *protobufs.RemoteConfigStatus) {
// TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21079
},
GetEffectiveConfig: func(_ context.Context) (*protobufs.EffectiveConfig, error) {
return s.createEffectiveConfigMsg(), nil
},
},
Capabilities: s.config.Capabilities.SupportedCapabilities(),
}
ad := s.agentDescription.Load().(*protobufs.AgentDescription)
if err = s.opampClient.SetAgentDescription(ad); err != nil {
return err
}
if err = s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false}); err != nil {
return err
}
if ac, ok := s.availableComponents.Load().(*protobufs.AvailableComponents); ok && ac != nil {
if err = s.opampClient.SetAvailableComponents(ac); err != nil {
return err
}
}
s.telemetrySettings.Logger.Debug("Starting OpAMP client...")
if err = s.opampClient.Start(context.Background(), settings); err != nil {
return err
}
s.telemetrySettings.Logger.Debug("OpAMP client started.")
return nil
}
// startOpAMPServer starts an OpAMP server that will communicate
// with an OpAMP extension running inside a Collector to receive
// data from inside the Collector. The internal server's lifetime is not
// matched to the Collector's process, but may be restarted
// depending on information received by the Supervisor from the remote
// OpAMP server.
func (s *Supervisor) startOpAMPServer() error {
s.opampServer = server.New(newLoggerFromZap(s.telemetrySettings.Logger, "opamp-server"))
var err error
s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
if err != nil {
return err
}
s.telemetrySettings.Logger.Debug("Starting OpAMP server...")
connected := &atomic.Bool{}
err = s.opampServer.Start(flattenedSettings{
endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
onConnecting: func(_ *http.Request) (bool, int) {
// Only allow one agent to be connected the this server at a time.
alreadyConnected := connected.Swap(true)
return !alreadyConnected, http.StatusConflict
},
onMessage: s.handleAgentOpAMPMessage,
onConnectionClose: func(_ serverTypes.Connection) {
connected.Store(false)
},
}.toServerSettings())
if err != nil {
return err
}
s.telemetrySettings.Logger.Debug("OpAMP server started.")
return nil
}
func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
s.agentConn.Store(conn)
s.telemetrySettings.Logger.Debug("Received OpAMP message from the agent")
if message.AgentDescription != nil {
s.setAgentDescription(message.AgentDescription)
}
if message.EffectiveConfig != nil {
if cfg, ok := message.EffectiveConfig.GetConfigMap().GetConfigMap()[""]; ok {
s.telemetrySettings.Logger.Debug("Received effective config from agent")
s.effectiveConfig.Store(string(cfg.Body))
err := s.opampClient.UpdateEffectiveConfig(context.Background())
if err != nil {
s.telemetrySettings.Logger.Error("The OpAMP client failed to update the effective config", zap.Error(err))
}
} else {
s.telemetrySettings.Logger.Error("Got effective config message, but the instance config was not present. Ignoring effective config.")
}
}
// Proxy client capabilities to server
if message.CustomCapabilities != nil {
err := s.opampClient.SetCustomCapabilities(message.CustomCapabilities)
if err != nil {
s.telemetrySettings.Logger.Error("Failed to send custom capabilities to OpAMP server")
}
}
// Proxy agent custom messages to server
if message.CustomMessage != nil {
select {
case s.customMessageToServer <- message.CustomMessage:
default:
s.telemetrySettings.Logger.Warn(
"Buffer full, skipping forwarding custom message to server",
zap.String("capability", message.CustomMessage.Capability),
zap.String("type", message.CustomMessage.Type),
)
}
}
if message.Health != nil {
s.telemetrySettings.Logger.Debug("Received health status from agent", zap.Bool("healthy", message.Health.Healthy))
s.lastHealthFromClient = message.Health
}
return &protobufs.ServerToAgent{}
}
func (s *Supervisor) forwardCustomMessagesToServerLoop() {
for {
select {
case cm := <-s.customMessageToServer:
for {
sendingChan, err := s.opampClient.SendCustomMessage(cm)
switch {
case errors.Is(err, types.ErrCustomMessagePending):
s.telemetrySettings.Logger.Debug("Custom message pending, waiting to send...")
<-sendingChan
continue
case err == nil: // OK
s.telemetrySettings.Logger.Debug("Custom message forwarded to server.")
default:
s.telemetrySettings.Logger.Error("Failed to send custom message to OpAMP server")
}
break
}
case <-s.doneChan:
return
}
}
}
// setAgentDescription sets the agent description, merging in any user-specified attributes from the supervisor configuration.
func (s *Supervisor) setAgentDescription(ad *protobufs.AgentDescription) {
ad.IdentifyingAttributes = applyKeyValueOverrides(s.config.Agent.Description.IdentifyingAttributes, ad.IdentifyingAttributes)
ad.NonIdentifyingAttributes = applyKeyValueOverrides(s.config.Agent.Description.NonIdentifyingAttributes, ad.NonIdentifyingAttributes)
s.agentDescription.Store(ad)
}
// setAvailableComponents sets the available components of the OpAMP agent
func (s *Supervisor) setAvailableComponents(ac *protobufs.AvailableComponents) {
s.availableComponents.Store(ac)
}
// applyKeyValueOverrides merges the overrides map into the array of key value pairs.
// If a key from overrides already exists in the array of key value pairs, it is overwritten by the value from the overrides map.
// An array of KeyValue pair is returned, with each key value pair having a distinct key.
func applyKeyValueOverrides(overrides map[string]string, orig []*protobufs.KeyValue) []*protobufs.KeyValue {
kvMap := make(map[string]*protobufs.KeyValue, len(orig)+len(overrides))
for _, kv := range orig {
kvMap[kv.Key] = kv
}
for k, v := range overrides {
kvMap[k] = &protobufs.KeyValue{
Key: k,
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: v,
},
},
}
}
// Sort keys for stable output, makes it easier to test.
keys := make([]string, 0, len(kvMap))
for k := range kvMap {
keys = append(keys, k)
}
sort.Strings(keys)
kvOut := make([]*protobufs.KeyValue, 0, len(kvMap))
for _, k := range keys {
v := kvMap[k]
kvOut = append(kvOut, v)
}
return kvOut
}
func (s *Supervisor) stopOpAMPClient() error {
s.telemetrySettings.Logger.Debug("Stopping OpAMP client...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := s.opampClient.Stop(ctx)
// TODO(srikanthccv): remove context.DeadlineExceeded after https://github.com/open-telemetry/opamp-go/pull/213
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
s.telemetrySettings.Logger.Debug("OpAMP client stopped.")
return nil
}
func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) http.Header {
headers := make(http.Header)
for _, header := range protoHeaders.Headers {
headers.Add(header.Key, header.Value)
}
return headers
}
func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *protobufs.OpAMPConnectionSettings) error {
if settings == nil {
s.telemetrySettings.Logger.Debug("Received ConnectionSettings request with nil settings")
return nil
}
newServerConfig := config.OpAMPServer{}
if settings.DestinationEndpoint != "" {
newServerConfig.Endpoint = settings.DestinationEndpoint
}
if settings.Headers != nil {
newServerConfig.Headers = s.getHeadersFromSettings(settings.Headers)
}
if settings.Certificate != nil {
if len(settings.Certificate.CaCert) != 0 {
newServerConfig.TLSSetting.CAPem = configopaque.String(settings.Certificate.CaCert)
}
if len(settings.Certificate.Cert) != 0 {
newServerConfig.TLSSetting.CertPem = configopaque.String(settings.Certificate.Cert)
}
if len(settings.Certificate.PrivateKey) != 0 {
newServerConfig.TLSSetting.KeyPem = configopaque.String(settings.Certificate.PrivateKey)
}
} else {
newServerConfig.TLSSetting = configtls.NewDefaultClientConfig()
newServerConfig.TLSSetting.InsecureSkipVerify = true
}
if err := newServerConfig.Validate(); err != nil {
s.telemetrySettings.Logger.Error("New OpAMP settings resulted in invalid configuration", zap.Error(err))
return err
}
if err := s.stopOpAMPClient(); err != nil {
s.telemetrySettings.Logger.Error("Cannot stop the OpAMP client", zap.Error(err))
return err
}
// take a copy of the current OpAMP server config
oldServerConfig := s.config.Server
// update the OpAMP server config
s.config.Server = newServerConfig
if err := s.startOpAMPClient(); err != nil {
s.telemetrySettings.Logger.Error("Cannot connect to the OpAMP server using the new settings", zap.Error(err))
// revert the OpAMP server config
s.config.Server = oldServerConfig
// start the OpAMP client with the old settings
if err := s.startOpAMPClient(); err != nil {
s.telemetrySettings.Logger.Error("Cannot reconnect to the OpAMP server after restoring old settings", zap.Error(err))
return err
}
}
return nil
}
func (s *Supervisor) composeNoopPipeline() ([]byte, error) {
var cfg bytes.Buffer
if !s.isFeatureGateSupported(AllowNoPipelinesFeatureGate) {
err := s.noopPipelineTemplate.Execute(&cfg, map[string]any{})
if err != nil {
return nil, err
}
}
return cfg.Bytes(), nil
}
func (s *Supervisor) composeNoopConfig() ([]byte, error) {
k := koanf.New("::")
cfg, err := s.composeNoopPipeline()
if err != nil {
return nil, err
}
if err = k.Load(rawbytes.Provider(cfg), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil {
return nil, err
}
if err = k.Load(rawbytes.Provider(s.composeOpAMPExtensionConfig()), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil {
return nil, err
}
return k.Marshal(yaml.Parser())
}
func (s *Supervisor) composeExtraLocalConfig() []byte {
var cfg bytes.Buffer
resourceAttrs := map[string]string{}
ad := s.agentDescription.Load().(*protobufs.AgentDescription)
for _, attr := range ad.IdentifyingAttributes {
resourceAttrs[attr.Key] = attr.Value.GetStringValue()
}
for _, attr := range ad.NonIdentifyingAttributes {
resourceAttrs[attr.Key] = attr.Value.GetStringValue()
}
tplVars := map[string]any{
"Healthcheck": s.agentHealthCheckEndpoint,
"ResourceAttributes": resourceAttrs,
"SupervisorPort": s.opampServerPort,
}
err := s.extraConfigTemplate.Execute(
&cfg,
tplVars,
)
if err != nil {
s.telemetrySettings.Logger.Error("Could not compose local config", zap.Error(err))
return nil
}
return cfg.Bytes()
}
func (s *Supervisor) composeOpAMPExtensionConfig() []byte {
orphanPollInterval := 5 * time.Second
if s.config.Agent.OrphanDetectionInterval > 0 {
orphanPollInterval = s.config.Agent.OrphanDetectionInterval
}
var cfg bytes.Buffer
tplVars := map[string]any{
"InstanceUid": s.persistentState.InstanceID.String(),
"SupervisorPort": s.opampServerPort,
"PID": s.pidProvider.PID(),
"PPIDPollInterval": orphanPollInterval,
"ReportsAvailableComponents": s.config.Capabilities.ReportsAvailableComponents,
}
err := s.opampextensionTemplate.Execute(
&cfg,
tplVars,
)
if err != nil {
s.telemetrySettings.Logger.Error("Could not compose local config", zap.Error(err))
return nil
}
return cfg.Bytes()
}
func (s *Supervisor) loadAndWriteInitialMergedConfig() error {
var lastRecvRemoteConfig, lastRecvOwnTelemetryConfig []byte
var err error
if s.config.Capabilities.AcceptsRemoteConfig {
// Try to load the last received remote config if it exists.
lastRecvRemoteConfig, err = os.ReadFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile))
switch {
case err == nil:
config := &protobufs.AgentRemoteConfig{}
err = proto.Unmarshal(lastRecvRemoteConfig, config)
if err != nil {
s.telemetrySettings.Logger.Error("Cannot parse last received remote config", zap.Error(err))
} else {
s.remoteConfig = config
}
case errors.Is(err, os.ErrNotExist):
s.telemetrySettings.Logger.Info("No last received remote config found")
default:
s.telemetrySettings.Logger.Error("error while reading last received config", zap.Error(err))
}
} else {
s.telemetrySettings.Logger.Debug("Remote config is not supported, will not attempt to load config from fil")
}
if s.config.Capabilities.ReportsOwnMetrics || s.config.Capabilities.ReportsOwnTraces || s.config.Capabilities.ReportsOwnLogs {
// Try to load the last received own metrics config if it exists.
lastRecvOwnTelemetryConfig, err = os.ReadFile(filepath.Join(s.config.Storage.Directory, lastRecvOwnTelemetryConfigFile))
if err == nil {
set := &protobufs.ConnectionSettingsOffers{}
err = proto.Unmarshal(lastRecvOwnTelemetryConfig, set)
if err != nil {
s.telemetrySettings.Logger.Error("Cannot parse last received own telemetry config", zap.Error(err))
} else {
s.setupOwnTelemetry(context.Background(), set)
}
}
} else {
s.telemetrySettings.Logger.Debug("Own metrics is not supported, will not attempt to load config from file")
}
_, err = s.composeMergedConfig(s.remoteConfig)
if err != nil {
return fmt.Errorf("could not compose initial merged config: %w", err)
}
// write the initial merged config to disk
cfgState := s.cfgState.Load().(*configState)
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0o600); err != nil {
s.telemetrySettings.Logger.Error("Failed to write agent config.", zap.Error(err))
}
return nil
}
// createEffectiveConfigMsg create an EffectiveConfig with the content of the
// current effective config.
func (s *Supervisor) createEffectiveConfigMsg() *protobufs.EffectiveConfig {
cfgStr, ok := s.effectiveConfig.Load().(string)
if !ok {
cfgState, ok := s.cfgState.Load().(*configState)
if !ok {
cfgStr = ""
} else {
cfgStr = cfgState.mergedConfig
}
}
cfg := &protobufs.EffectiveConfig{
ConfigMap: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: []byte(cfgStr)},
},
},
}
return cfg
}
func (s *Supervisor) updateOwnTelemetryData(data map[string]any, signal string, settings *protobufs.TelemetryConnectionSettings) map[string]any {
if settings == nil || len(settings.DestinationEndpoint) == 0 {
return data
}
data[fmt.Sprintf("%sEndpoint", signal)] = settings.DestinationEndpoint
data[fmt.Sprintf("%sHeaders", signal)] = []protobufs.Header{}
if settings.Headers != nil {
data[fmt.Sprintf("%sHeaders", signal)] = settings.Headers.Headers
}
return data
}
func (s *Supervisor) setupOwnTelemetry(_ context.Context, settings *protobufs.ConnectionSettingsOffers) (configChanged bool) {
var cfg bytes.Buffer
data := s.updateOwnTelemetryData(map[string]any{}, "Metrics", settings.GetOwnMetrics())
data = s.updateOwnTelemetryData(data, "Logs", settings.GetOwnLogs())
data = s.updateOwnTelemetryData(data, "Traces", settings.GetOwnTraces())
if len(data) == 0 {
s.telemetrySettings.Logger.Debug("Disabling own telemetry pipeline in the config")
} else {
err := s.ownTelemetryTemplate.Execute(&cfg, data)
if err != nil {
s.telemetrySettings.Logger.Error("Could not setup own telemetry", zap.Error(err))
return
}
}
s.agentConfigOwnMetricsSection.Store(cfg.String())
// Need to recalculate the Agent config so that the metric config is included in it.
configChanged, err := s.composeMergedConfig(s.remoteConfig)
if err != nil {
s.telemetrySettings.Logger.Error("Error composing merged config for own metrics. Ignoring agent self metrics config", zap.Error(err))
return
}
return configChanged
}
// composeMergedConfig composes the merged config from multiple sources:
// 1) the remote config from OpAMP Server
// 2) the own metrics config section
// 3) the local override config that is hard-coded in the Supervisor.
func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (configChanged bool, err error) {
k := koanf.New("::")
configMapIsEmpty := len(config.GetConfig().GetConfigMap()) == 0
if !configMapIsEmpty {
c := config.GetConfig()
// Sort to make sure the order of merging is stable.
var names []string
for name := range c.ConfigMap {
if name == "" {
// skip instance config
continue
}
names = append(names, name)
}
sort.Strings(names)
// Append instance config as the last item.
names = append(names, "")
// Merge received configs.
for _, name := range names {
item := c.ConfigMap[name]
if item == nil {
continue
}
k2 := koanf.New("::")
err = k2.Load(rawbytes.Provider(item.Body), yaml.Parser())
if err != nil {
return false, fmt.Errorf("cannot parse config named %s: %w", name, err)
}
err = k.Merge(k2)
if err != nil {
return false, fmt.Errorf("cannot merge config named %s: %w", name, err)
}
}
} else {
// Add noop pipeline
var noopConfig []byte
noopConfig, err = s.composeNoopPipeline()
if err != nil {
return false, fmt.Errorf("could not compose noop pipeline: %w", err)
}
if err = k.Load(rawbytes.Provider(noopConfig), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil {
return false, fmt.Errorf("could not merge noop pipeline: %w", err)
}
}
// Merge own metrics config.
ownMetricsCfg, ok := s.agentConfigOwnMetricsSection.Load().(string)
if ok {
if err = k.Load(rawbytes.Provider([]byte(ownMetricsCfg)), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil {
return false, err
}
}
// Merge local config last since it has the highest precedence.
if err = k.Load(rawbytes.Provider(s.composeExtraLocalConfig()), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil {
return false, err
}
if err = k.Load(rawbytes.Provider(s.composeOpAMPExtensionConfig()), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil {
return false, err
}
// The merged final result is our new merged config.
newMergedConfigBytes, err := k.Marshal(yaml.Parser())
if err != nil {
return false, err
}
// Check if supervisor's merged config is changed.
newConfigState := &configState{
mergedConfig: string(newMergedConfigBytes),
configMapIsEmpty: configMapIsEmpty,
}
configChanged = false
oldConfigState := s.cfgState.Swap(newConfigState)
if oldConfigState == nil || !oldConfigState.(*configState).equal(newConfigState) {
s.telemetrySettings.Logger.Debug("Merged config changed.")
configChanged = true
}
return configChanged, nil
}
func (s *Supervisor) handleRestartCommand() error {
s.agentRestarting.Store(true)
defer s.agentRestarting.Store(false)
s.telemetrySettings.Logger.Debug("Received restart command")
err := s.commander.Restart(context.Background())
if err != nil {
s.telemetrySettings.Logger.Error("Could not restart agent process", zap.Error(err))
}
return err
}
func (s *Supervisor) startAgent() (agentStartStatus, error) {
if s.cfgState.Load().(*configState).configMapIsEmpty {
// Don't start the agent if there is no config to run
s.telemetrySettings.Logger.Info("No config present, not starting agent.")
// need to manually trigger updating effective config
err := s.opampClient.UpdateEffectiveConfig(context.Background())
if err != nil {
s.telemetrySettings.Logger.Error("The OpAMP client failed to update the effective config", zap.Error(err))
}
return agentNotStarting, nil
}
err := s.commander.Start(context.Background())
if err != nil {
s.telemetrySettings.Logger.Error("Cannot start the agent", zap.Error(err))
startErr := fmt.Errorf("cannot start the agent: %w", err)
err = s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false, LastError: startErr.Error()})
if err != nil {
s.telemetrySettings.Logger.Error("Failed to report OpAMP client health", zap.Error(err))
}
return "", startErr
}
s.agentHasStarted = false
s.agentStartHealthCheckAttempts = 0
s.startedAt = time.Now()
s.startHealthCheckTicker()
s.healthChecker = healthchecker.NewHTTPHealthChecker(fmt.Sprintf("http://%s", s.agentHealthCheckEndpoint))
return agentStarting, nil
}
func (s *Supervisor) startHealthCheckTicker() {
// Prepare health checker
healthCheckBackoff := backoff.NewExponentialBackOff()
healthCheckBackoff.MaxInterval = 60 * time.Second
healthCheckBackoff.MaxElapsedTime = 0 // Never stop
if s.healthCheckTicker != nil {
s.healthCheckTicker.Stop()
}
s.healthCheckTicker = backoff.NewTicker(healthCheckBackoff)
}
func (s *Supervisor) healthCheck() {
if !s.commander.IsRunning() {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
err := s.healthChecker.Check(ctx)
cancel()
// Prepare OpAMP health report.
health := &protobufs.ComponentHealth{
StartTimeUnixNano: uint64(s.startedAt.UnixNano()),
}
if err != nil {
health.Healthy = false
if !s.agentHasStarted && s.agentStartHealthCheckAttempts < 10 {
health.LastError = "Agent is starting"
s.agentStartHealthCheckAttempts++
// if we have a last health status, use it
if s.lastHealth != nil && s.lastHealth.Healthy {
health.Healthy = s.lastHealth.Healthy
}
} else {
health.LastError = err.Error()
s.telemetrySettings.Logger.Error("Agent is not healthy", zap.Error(err))
}
} else {
s.agentHasStarted = true
health.Healthy = true
s.telemetrySettings.Logger.Debug("Agent is healthy.")
}
s.lastHealth = health
if err != nil && errors.Is(err, s.lastHealthCheckErr) {
// No difference from last check. Nothing new to report.
return
}
// Report via OpAMP.
if err2 := s.opampClient.SetHealth(health); err2 != nil {
s.telemetrySettings.Logger.Error("Could not report health to OpAMP server", zap.Error(err2))
return
}
s.lastHealthCheckErr = err
}
func (s *Supervisor) runAgentProcess() {
if _, err := os.Stat(s.agentConfigFilePath()); err == nil {
// We have an effective config file saved previously. Use it to start the agent.
s.telemetrySettings.Logger.Debug("Effective config found, starting agent initial time")
_, err := s.startAgent()
if err != nil {
s.telemetrySettings.Logger.Error("starting agent failed", zap.Error(err))
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, err.Error())
}
}
restartTimer := time.NewTimer(0)
restartTimer.Stop()
configApplyTimeoutTimer := time.NewTimer(0)
configApplyTimeoutTimer.Stop()
for {
select {
case <-s.hasNewConfig:
s.lastHealthFromClient = nil
if !configApplyTimeoutTimer.Stop() {
select {
case <-configApplyTimeoutTimer.C: // Try to drain the channel
default:
}
}
configApplyTimeoutTimer.Reset(s.config.Agent.ConfigApplyTimeout)
s.telemetrySettings.Logger.Debug("Restarting agent due to new config")
restartTimer.Stop()
s.stopAgentApplyConfig()
status, err := s.startAgent()
if err != nil {
s.telemetrySettings.Logger.Error("starting agent with new config failed", zap.Error(err))
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, err.Error())
}
if status == agentNotStarting {
// not starting agent because of nop config, clear timer
configApplyTimeoutTimer.Stop()
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, "")
}
case <-s.commander.Exited():
// the agent process exit is expected for restart command and will not attempt to restart
if s.agentRestarting.Load() {
continue
}
s.telemetrySettings.Logger.Debug("Agent process exited unexpectedly. Will restart in a bit...", zap.Int("pid", s.commander.Pid()), zap.Int("exit_code", s.commander.ExitCode()))
errMsg := fmt.Sprintf(
"Agent process PID=%d exited unexpectedly, exit code=%d. Will restart in a bit...",
s.commander.Pid(), s.commander.ExitCode(),
)
err := s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false, LastError: errMsg})
if err != nil {
s.telemetrySettings.Logger.Error("Could not report health to OpAMP server", zap.Error(err))
}
// TODO: decide why the agent stopped. If it was due to bad config, report it to server.
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21079
// Wait 5 seconds before starting again.
if !restartTimer.Stop() {
select {
case <-restartTimer.C: // Try to drain the channel
default:
}
}
restartTimer.Reset(5 * time.Second)
case <-restartTimer.C:
s.telemetrySettings.Logger.Debug("Agent starting after start backoff")
_, err := s.startAgent()
if err != nil {
s.telemetrySettings.Logger.Error("restarting agent failed", zap.Error(err))
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, err.Error())
}
case <-configApplyTimeoutTimer.C:
if s.lastHealthFromClient == nil || !s.lastHealthFromClient.Healthy {
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, "Config apply timeout exceeded")
} else {
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, "")
}
case <-s.healthCheckTicker.C:
s.healthCheck()
case <-s.doneChan:
err := s.commander.Stop(context.Background())
if err != nil {
s.telemetrySettings.Logger.Error("Could not stop agent process", zap.Error(err))
}
return
}
}
}
func (s *Supervisor) stopAgentApplyConfig() {
s.telemetrySettings.Logger.Debug("Stopping the agent to apply new config")
cfgState := s.cfgState.Load().(*configState)
err := s.commander.Stop(context.Background())
if err != nil {
s.telemetrySettings.Logger.Error("Could not stop agent process", zap.Error(err))
}
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0o600); err != nil {
s.telemetrySettings.Logger.Error("Failed to write agent config.", zap.Error(err))
}
}
func (s *Supervisor) Shutdown() {
s.telemetrySettings.Logger.Debug("Supervisor shutting down...")
close(s.doneChan)
// Shutdown in order from producer to consumer (agent -> customMessageForwarder -> local OpAMP server -> client to remote OpAMP server).
s.agentWG.Wait()
s.customMessageWG.Wait()
if s.opampServer != nil {
s.telemetrySettings.Logger.Debug("Stopping OpAMP server...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := s.opampServer.Stop(ctx)
if err != nil {
s.telemetrySettings.Logger.Error("Could not stop the OpAMP Server")
} else {
s.telemetrySettings.Logger.Debug("OpAMP server stopped.")
}
}
if s.opampClient != nil {
err := s.opampClient.SetHealth(
&protobufs.ComponentHealth{
Healthy: false, LastError: "Supervisor is shutdown",
},
)
if err != nil {
s.telemetrySettings.Logger.Error("Could not report health to OpAMP server", zap.Error(err))
}
err = s.stopOpAMPClient()
if err != nil {
s.telemetrySettings.Logger.Error("Could not stop the OpAMP client", zap.Error(err))
}
}
if err := s.shutdownTelemetry(); err != nil {
s.telemetrySettings.Logger.Error("Could not shut down self telemetry", zap.Error(err))
}
if s.healthCheckTicker != nil {
s.healthCheckTicker.Stop()
}
}
func (s *Supervisor) shutdownTelemetry() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// The metric.MeterProvider and trace.TracerProvider interfaces do not have a Shutdown method.
// To shutdown the providers we try to cast to this interface, which matches the type signature used in the SDK.
type shutdownable interface {
Shutdown(context.Context) error
}
var err error
if prov, ok := s.telemetrySettings.MeterProvider.(shutdownable); ok {
if shutdownErr := prov.Shutdown(ctx); shutdownErr != nil {
err = multierr.Append(err, fmt.Errorf("failed to shutdown meter provider: %w", shutdownErr))
}
}
if prov, ok := s.telemetrySettings.TracerProvider.(shutdownable); ok {
if shutdownErr := prov.Shutdown(ctx); shutdownErr != nil {
err = multierr.Append(err, fmt.Errorf("failed to shutdown tracer provider: %w", shutdownErr))
}
}
if prov, ok := s.telemetrySettings.loggerProvider.(shutdownable); ok {
if shutdownErr := prov.Shutdown(ctx); shutdownErr != nil {
err = multierr.Append(err, fmt.Errorf("failed to shutdown logger provider: %w", shutdownErr))
}
}
return err
}
func (s *Supervisor) saveLastReceivedConfig(config *protobufs.AgentRemoteConfig) error {
cfg, err := proto.Marshal(config)
if err != nil {
return err
}
return os.WriteFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile), cfg, 0o600)
}
func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.ConnectionSettingsOffers, filePath string) error {
cfg, err := proto.Marshal(set)
if err != nil {
return err
}
return os.WriteFile(filepath.Join(s.config.Storage.Directory, filePath), cfg, 0o600)
}
func (s *Supervisor) reportConfigStatus(status protobufs.RemoteConfigStatuses, errorMessage string) {
if !s.config.Capabilities.ReportsRemoteConfig {
s.telemetrySettings.Logger.Debug("supervisor is not configured to report remote config status")
}
err := s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: s.remoteConfig.GetConfigHash(),
Status: status,
ErrorMessage: errorMessage,
})
if err != nil {
s.telemetrySettings.Logger.Error("Could not report OpAMP remote config status", zap.Error(err))
}
}
func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) {
configChanged := false
if msg.AgentIdentification != nil {
configChanged = s.processAgentIdentificationMessage(msg.AgentIdentification) || configChanged
}
if msg.RemoteConfig != nil {
configChanged = s.processRemoteConfigMessage(msg.RemoteConfig) || configChanged
}
if msg.OwnMetricsConnSettings != nil || msg.OwnTracesConnSettings != nil || msg.OwnLogsConnSettings != nil {
configChanged = s.processOwnTelemetryConnSettingsMessage(ctx, &protobufs.ConnectionSettingsOffers{
OwnMetrics: msg.OwnMetricsConnSettings,
OwnTraces: msg.OwnTracesConnSettings,
OwnLogs: msg.OwnLogsConnSettings,
}) || configChanged
}
// Update the agent config if any messages have touched the config
if configChanged {
err := s.opampClient.UpdateEffectiveConfig(ctx)
if err != nil {
s.telemetrySettings.Logger.Error("The OpAMP client failed to update the effective config", zap.Error(err))
}
s.telemetrySettings.Logger.Debug("Config is changed. Signal to restart the agent")
// Signal that there is a new config.
select {
case s.hasNewConfig <- struct{}{}:
default:
}
}
messageToAgent := &protobufs.ServerToAgent{
InstanceUid: s.persistentState.InstanceID[:],
}
haveMessageForAgent := false
// Proxy server capabilities to opamp extension
if msg.CustomCapabilities != nil {
messageToAgent.CustomCapabilities = msg.CustomCapabilities
haveMessageForAgent = true
}
// Proxy server messages to opamp extension
if msg.CustomMessage != nil {
messageToAgent.CustomMessage = msg.CustomMessage
haveMessageForAgent = true
}
// Send any messages that need proxying to the agent.
if haveMessageForAgent {
conn, ok := s.agentConn.Load().(serverTypes.Connection)
if ok {
err := conn.Send(ctx, messageToAgent)
if err != nil {
s.telemetrySettings.Logger.Error("Error forwarding message to agent from server", zap.Error(err))
}
}
}
}
// processRemoteConfigMessage processes an AgentRemoteConfig message, returning true if the agent config has changed.
func (s *Supervisor) processRemoteConfigMessage(msg *protobufs.AgentRemoteConfig) bool {
if err := s.saveLastReceivedConfig(msg); err != nil {
s.telemetrySettings.Logger.Error("Could not save last received remote config", zap.Error(err))
}
s.remoteConfig = msg
s.telemetrySettings.Logger.Debug("Received remote config from server", zap.String("hash", fmt.Sprintf("%x", s.remoteConfig.ConfigHash)))
var err error
configChanged, err := s.composeMergedConfig(s.remoteConfig)
if err != nil {
s.telemetrySettings.Logger.Error("Error composing merged config. Reporting failed remote config status.", zap.Error(err))
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, err.Error())
} else {
s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING, "")
}
return configChanged
}
// processOwnTelemetryConnSettingsMessage processes a TelemetryConnectionSettings message, returning true if the agent config has changed.
func (s *Supervisor) processOwnTelemetryConnSettingsMessage(ctx context.Context, msg *protobufs.ConnectionSettingsOffers) bool {
if err := s.saveLastReceivedOwnTelemetrySettings(msg, lastRecvOwnTelemetryConfigFile); err != nil {
s.telemetrySettings.Logger.Error("Could not save last received own telemetry settings", zap.Error(err))
}
return s.setupOwnTelemetry(ctx, msg)
}
// processAgentIdentificationMessage processes an AgentIdentification message, returning true if the agent config has changed.
func (s *Supervisor) processAgentIdentificationMessage(msg *protobufs.AgentIdentification) bool {
newInstanceID, err := uuid.FromBytes(msg.NewInstanceUid)
if err != nil {
s.telemetrySettings.Logger.Error("Failed to parse instance UUID", zap.Error(err))
return false
}
s.telemetrySettings.Logger.Debug("Agent identity is changing",
zap.String("old_id", s.persistentState.InstanceID.String()),
zap.String("new_id", newInstanceID.String()))
err = s.persistentState.SetInstanceID(newInstanceID)
if err != nil {
s.telemetrySettings.Logger.Error("Failed to persist new instance ID, instance ID will revert on restart.", zap.String("new_id", newInstanceID.String()), zap.Error(err))
}
err = s.opampClient.SetAgentDescription(s.agentDescription.Load().(*protobufs.AgentDescription))
if err != nil {
s.telemetrySettings.Logger.Error("Failed to send agent description to OpAMP server")
}
// Need to recalculate the Agent config so that the new agent identification is included in it.
configChanged, err := s.composeMergedConfig(s.remoteConfig)
if err != nil {
s.telemetrySettings.Logger.Error("Error composing merged config with new instance ID", zap.Error(err))
return false
}
return configChanged
}
func (s *Supervisor) persistentStateFilePath() string {
return filepath.Join(s.config.Storage.Directory, persistentStateFileName)
}
func (s *Supervisor) agentConfigFilePath() string {
return filepath.Join(s.config.Storage.Directory, agentConfigFileName)
}
func (s *Supervisor) getSupervisorOpAMPServerPort() (int, error) {
if s.config.Agent.OpAMPServerPort != 0 {
return s.config.Agent.OpAMPServerPort, nil
}
return s.findRandomPort()
}
func (s *Supervisor) getFeatureGateFlag() []string {
flags := []string{}
for k := range s.featureGates {
flags = append(flags, k)
}
if len(flags) == 0 {
return []string{}
}
return []string{"--feature-gates", strings.Join(flags, ",")}
}
func (s *Supervisor) isFeatureGateSupported(gate string) bool {
_, ok := s.featureGates[gate]
return ok
}
func (s *Supervisor) findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return 0, err
}
port := l.Addr().(*net.TCPAddr).Port
err = l.Close()
if err != nil {
return 0, err
}
return port, nil
}
func (s *Supervisor) getTracer() trace.Tracer {
tracer := s.telemetrySettings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor")
return tracer
}
// The default koanf behavior is to override lists in the config.
// Instead, we provide this function, which merges the source and destination config's
// extension lists by concatenating the two.
// Will be resolved by https://github.com/open-telemetry/opentelemetry-collector/issues/8754
func configMergeFunc(src, dest map[string]any) error {
srcExtensions := maps.Search(src, []string{"service", "extensions"})
destExtensions := maps.Search(dest, []string{"service", "extensions"})
maps.Merge(src, dest)
if destExt, ok := destExtensions.([]any); ok {
if srcExt, ok := srcExtensions.([]any); ok {
if service, ok := dest["service"].(map[string]any); ok {
service["extensions"] = append(destExt, srcExt...)
}
}
}
return nil
}