func()

in cmd/opampsupervisor/supervisor/supervisor.go [437:613]


func (s *Supervisor) getBootstrapInfo() (err error) {
	_, span := s.getTracer().Start(context.Background(), "GetBootstrapInfo")
	defer span.End()
	s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
	if err != nil {
		span.SetStatus(codes.Error, fmt.Sprintf("Could not get supervisor opamp service port: %v", err))
		return err
	}

	bootstrapConfig, err := s.composeNoopConfig()
	if err != nil {
		span.SetStatus(codes.Error, fmt.Sprintf("Could not compose noop config config: %v", err))
		return err
	}

	err = os.WriteFile(s.agentConfigFilePath(), bootstrapConfig, 0o600)
	if err != nil {
		span.SetStatus(codes.Error, fmt.Sprintf("Failed to write agent config: %v", err))
		return fmt.Errorf("failed to write agent config: %w", err)
	}

	srv := server.New(newLoggerFromZap(s.telemetrySettings.Logger, "opamp-server"))

	done := make(chan error, 1)
	var connected atomic.Bool
	var doneReported atomic.Bool

	// Start a one-shot server to get the Collector's agent description
	// and available components using the Collector's OpAMP extension.
	err = srv.Start(flattenedSettings{
		endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
		onConnecting: func(_ *http.Request) (bool, int) {
			connected.Store(true)
			return true, http.StatusOK
		},
		onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
			response := &protobufs.ServerToAgent{}
			if message.GetAvailableComponents() != nil {
				s.setAvailableComponents(message.AvailableComponents)
			}

			if message.AgentDescription != nil {
				instanceIDSeen := false
				s.setAgentDescription(message.AgentDescription)
				identAttr := message.AgentDescription.IdentifyingAttributes

				for _, attr := range identAttr {
					if attr.Key == semconv.AttributeServiceInstanceID {
						if attr.Value.GetStringValue() != s.persistentState.InstanceID.String() {
							done <- fmt.Errorf(
								"the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s): %w",
								attr.Value.GetStringValue(),
								s.persistentState.InstanceID.String(),
								errNonMatchingInstanceUID,
							)
							return response
						}
						instanceIDSeen = true
					}
				}

				if !instanceIDSeen {
					done <- errors.New("the Collector did not specify an instance ID in its AgentDescription message")
					return response
				}
			}

			// agent description must be defined
			_, ok := s.agentDescription.Load().(*protobufs.AgentDescription)
			if !ok {
				return response
			}

			// if available components have not been reported, agent description is sufficient to continue
			availableComponents, availableComponentsOk := s.availableComponents.Load().(*protobufs.AvailableComponents)
			if availableComponentsOk {
				// must have a full list of components if available components have been reported
				if availableComponents.GetComponents() != nil {
					if !doneReported.Load() {
						done <- nil
						doneReported.Store(true)
					}
				} else {
					// if we don't have a full component list, ask for it
					response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
				}
				return response
			}

			// need to only report done once, not on each message - otherwise, we get a hung thread
			if !doneReported.Load() {
				done <- nil
				doneReported.Store(true)
			}

			return response
		},
	}.toServerSettings())
	if err != nil {
		span.SetStatus(codes.Error, fmt.Sprintf("Could not start OpAMP server: %v", err))
		return err
	}

	defer func() {
		if stopErr := srv.Stop(context.Background()); stopErr != nil {
			err = errors.Join(err, fmt.Errorf("error when stopping the opamp server: %w", stopErr))
		}
	}()

	flags := []string{
		"--config", s.agentConfigFilePath(),
	}
	featuregateFlag := s.getFeatureGateFlag()
	if len(featuregateFlag) > 0 {
		flags = append(flags, featuregateFlag...)
	}
	cmd, err := commander.NewCommander(
		s.telemetrySettings.Logger,
		s.config.Storage.Directory,
		s.config.Agent,
		flags...,
	)
	if err != nil {
		span.SetStatus(codes.Error, fmt.Sprintf("Could not start Agent: %v", err))
		return err
	}

	if err = cmd.Start(context.Background()); err != nil {
		span.SetStatus(codes.Error, fmt.Sprintf("Could not start Agent: %v", err))
		return err
	}

	defer func() {
		if stopErr := cmd.Stop(context.Background()); stopErr != nil {
			err = errors.Join(err, fmt.Errorf("error when stopping the collector: %w", stopErr))
		}
	}()

	select {
	case <-time.After(s.config.Agent.BootstrapTimeout):
		if connected.Load() {
			msg := "collector connected but never responded with an AgentDescription message"
			span.SetStatus(codes.Error, msg)
			return errors.New(msg)
		} else {
			msg := "collector's OpAMP client never connected to the Supervisor"
			span.SetStatus(codes.Error, msg)
			return errors.New(msg)
		}
	case err = <-done:
		if errors.Is(err, errNonMatchingInstanceUID) {
			// try to report the issue to the OpAMP server
			if startOpAMPErr := s.startOpAMPClient(); startOpAMPErr == nil {
				defer func(s *Supervisor) {
					if stopErr := s.stopOpAMPClient(); stopErr != nil {
						s.telemetrySettings.Logger.Error("Could not stop OpAmp client", zap.Error(stopErr))
					}
				}(s)
				if healthErr := s.opampClient.SetHealth(&protobufs.ComponentHealth{
					Healthy:   false,
					LastError: err.Error(),
				}); healthErr != nil {
					s.telemetrySettings.Logger.Error("Could not report health to OpAMP server", zap.Error(healthErr))
				}
			} else {
				s.telemetrySettings.Logger.Error("Could not start OpAMP client to report health to server", zap.Error(startOpAMPErr))
			}
		}
		if err != nil {
			s.telemetrySettings.Logger.Error("Could not complete bootstrap", zap.Error(err))
			span.SetStatus(codes.Error, err.Error())
		} else {
			span.SetStatus(codes.Ok, "")
		}
		return err
	}
}