in cmd/opampsupervisor/supervisor/supervisor.go [437:613]
func (s *Supervisor) getBootstrapInfo() (err error) {
_, span := s.getTracer().Start(context.Background(), "GetBootstrapInfo")
defer span.End()
s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not get supervisor opamp service port: %v", err))
return err
}
bootstrapConfig, err := s.composeNoopConfig()
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not compose noop config config: %v", err))
return err
}
err = os.WriteFile(s.agentConfigFilePath(), bootstrapConfig, 0o600)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to write agent config: %v", err))
return fmt.Errorf("failed to write agent config: %w", err)
}
srv := server.New(newLoggerFromZap(s.telemetrySettings.Logger, "opamp-server"))
done := make(chan error, 1)
var connected atomic.Bool
var doneReported atomic.Bool
// Start a one-shot server to get the Collector's agent description
// and available components using the Collector's OpAMP extension.
err = srv.Start(flattenedSettings{
endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
onConnecting: func(_ *http.Request) (bool, int) {
connected.Store(true)
return true, http.StatusOK
},
onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
response := &protobufs.ServerToAgent{}
if message.GetAvailableComponents() != nil {
s.setAvailableComponents(message.AvailableComponents)
}
if message.AgentDescription != nil {
instanceIDSeen := false
s.setAgentDescription(message.AgentDescription)
identAttr := message.AgentDescription.IdentifyingAttributes
for _, attr := range identAttr {
if attr.Key == semconv.AttributeServiceInstanceID {
if attr.Value.GetStringValue() != s.persistentState.InstanceID.String() {
done <- fmt.Errorf(
"the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s): %w",
attr.Value.GetStringValue(),
s.persistentState.InstanceID.String(),
errNonMatchingInstanceUID,
)
return response
}
instanceIDSeen = true
}
}
if !instanceIDSeen {
done <- errors.New("the Collector did not specify an instance ID in its AgentDescription message")
return response
}
}
// agent description must be defined
_, ok := s.agentDescription.Load().(*protobufs.AgentDescription)
if !ok {
return response
}
// if available components have not been reported, agent description is sufficient to continue
availableComponents, availableComponentsOk := s.availableComponents.Load().(*protobufs.AvailableComponents)
if availableComponentsOk {
// must have a full list of components if available components have been reported
if availableComponents.GetComponents() != nil {
if !doneReported.Load() {
done <- nil
doneReported.Store(true)
}
} else {
// if we don't have a full component list, ask for it
response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
}
return response
}
// need to only report done once, not on each message - otherwise, we get a hung thread
if !doneReported.Load() {
done <- nil
doneReported.Store(true)
}
return response
},
}.toServerSettings())
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not start OpAMP server: %v", err))
return err
}
defer func() {
if stopErr := srv.Stop(context.Background()); stopErr != nil {
err = errors.Join(err, fmt.Errorf("error when stopping the opamp server: %w", stopErr))
}
}()
flags := []string{
"--config", s.agentConfigFilePath(),
}
featuregateFlag := s.getFeatureGateFlag()
if len(featuregateFlag) > 0 {
flags = append(flags, featuregateFlag...)
}
cmd, err := commander.NewCommander(
s.telemetrySettings.Logger,
s.config.Storage.Directory,
s.config.Agent,
flags...,
)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not start Agent: %v", err))
return err
}
if err = cmd.Start(context.Background()); err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not start Agent: %v", err))
return err
}
defer func() {
if stopErr := cmd.Stop(context.Background()); stopErr != nil {
err = errors.Join(err, fmt.Errorf("error when stopping the collector: %w", stopErr))
}
}()
select {
case <-time.After(s.config.Agent.BootstrapTimeout):
if connected.Load() {
msg := "collector connected but never responded with an AgentDescription message"
span.SetStatus(codes.Error, msg)
return errors.New(msg)
} else {
msg := "collector's OpAMP client never connected to the Supervisor"
span.SetStatus(codes.Error, msg)
return errors.New(msg)
}
case err = <-done:
if errors.Is(err, errNonMatchingInstanceUID) {
// try to report the issue to the OpAMP server
if startOpAMPErr := s.startOpAMPClient(); startOpAMPErr == nil {
defer func(s *Supervisor) {
if stopErr := s.stopOpAMPClient(); stopErr != nil {
s.telemetrySettings.Logger.Error("Could not stop OpAmp client", zap.Error(stopErr))
}
}(s)
if healthErr := s.opampClient.SetHealth(&protobufs.ComponentHealth{
Healthy: false,
LastError: err.Error(),
}); healthErr != nil {
s.telemetrySettings.Logger.Error("Could not report health to OpAMP server", zap.Error(healthErr))
}
} else {
s.telemetrySettings.Logger.Error("Could not start OpAMP client to report health to server", zap.Error(startOpAMPErr))
}
}
if err != nil {
s.telemetrySettings.Logger.Error("Could not complete bootstrap", zap.Error(err))
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
return err
}
}