func()

in extension/apmconfigextension/opamp_callbacks.go [88:145]


func (rc *remoteConfigCallbacks) onMessage(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
	serverToAgent := protobufs.ServerToAgent{}
	serverToAgent.Capabilities = uint64(protobufs.ServerCapabilities_ServerCapabilities_OffersRemoteConfig)
	serverToAgent.InstanceUid = message.GetInstanceUid()

	if message.GetInstanceUid() == nil {
		serverToAgent.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState)
		return rc.serverError("instance_uid must be provided", &serverToAgent)
	}

	agentUid := hex.EncodeToString(message.GetInstanceUid())
	if message.GetAgentDescription() != nil {
		// new description might lead to another remote configuration
		rc.agentState.Store(agentUid, agentInfo{
			agentUid:              message.GetInstanceUid(),
			identifyingAttributes: message.AgentDescription.IdentifyingAttributes,
		})
	}

	agentUidField := zap.String("instance_uid", agentUid)
	if message.GetAgentDisconnect() != nil {
		rc.logger.Info("Disconnecting the agent from the remote configuration service", agentUidField)
		rc.agentState.Delete(agentUid)
		return &serverToAgent
	}

	loadedAgent, _ := rc.agentState.LoadOrStore(agentUid, agentInfo{
		agentUid: message.GetInstanceUid(),
	})
	agent, ok := loadedAgent.(agentInfo)
	if !ok {
		rc.logger.Warn("unexpected type in agentState cache", agentUidField)
		return rc.serverError("internal error: invalid agent state", &serverToAgent)
	}

	remoteConfig, err := rc.configClient.RemoteConfig(ctx, agent.agentUid, agent.identifyingAttributes)
	if err != nil {
		// remote config client could not identify the agent
		if errors.Is(err, apmconfig.UnidentifiedAgent) {
			serverToAgent.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState)
		}
		return rc.serverError(fmt.Sprintf("error retrieving remote configuration: %s", err), &serverToAgent)
	} else if remoteConfig == nil {
		// nothing to be applied
		return &serverToAgent
	}

	if message.GetRemoteConfigStatus() != nil && message.GetRemoteConfigStatus().Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED && bytes.Equal(remoteConfig.ConfigHash, message.RemoteConfigStatus.GetLastRemoteConfigHash()) {
		rc.logger.Info("Remote config applied", zap.String("hash", hex.EncodeToString(remoteConfig.ConfigHash)), agentUidField)
		agent.lastConfigHash = message.GetRemoteConfigStatus().GetLastRemoteConfigHash()
		rc.agentState.Store(agentUid, agent)
	} else if !bytes.Equal(agent.lastConfigHash, remoteConfig.ConfigHash) {
		rc.logger.Info("Sending new remote configuration", agentUidField, zap.String("hash", hex.EncodeToString(remoteConfig.ConfigHash)))
		serverToAgent.RemoteConfig = remoteConfig
	}

	return &serverToAgent
}