in extension/apmconfigextension/opamp_callbacks.go [88:145]
func (rc *remoteConfigCallbacks) onMessage(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
serverToAgent := protobufs.ServerToAgent{}
serverToAgent.Capabilities = uint64(protobufs.ServerCapabilities_ServerCapabilities_OffersRemoteConfig)
serverToAgent.InstanceUid = message.GetInstanceUid()
if message.GetInstanceUid() == nil {
serverToAgent.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState)
return rc.serverError("instance_uid must be provided", &serverToAgent)
}
agentUid := hex.EncodeToString(message.GetInstanceUid())
if message.GetAgentDescription() != nil {
// new description might lead to another remote configuration
rc.agentState.Store(agentUid, agentInfo{
agentUid: message.GetInstanceUid(),
identifyingAttributes: message.AgentDescription.IdentifyingAttributes,
})
}
agentUidField := zap.String("instance_uid", agentUid)
if message.GetAgentDisconnect() != nil {
rc.logger.Info("Disconnecting the agent from the remote configuration service", agentUidField)
rc.agentState.Delete(agentUid)
return &serverToAgent
}
loadedAgent, _ := rc.agentState.LoadOrStore(agentUid, agentInfo{
agentUid: message.GetInstanceUid(),
})
agent, ok := loadedAgent.(agentInfo)
if !ok {
rc.logger.Warn("unexpected type in agentState cache", agentUidField)
return rc.serverError("internal error: invalid agent state", &serverToAgent)
}
remoteConfig, err := rc.configClient.RemoteConfig(ctx, agent.agentUid, agent.identifyingAttributes)
if err != nil {
// remote config client could not identify the agent
if errors.Is(err, apmconfig.UnidentifiedAgent) {
serverToAgent.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState)
}
return rc.serverError(fmt.Sprintf("error retrieving remote configuration: %s", err), &serverToAgent)
} else if remoteConfig == nil {
// nothing to be applied
return &serverToAgent
}
if message.GetRemoteConfigStatus() != nil && message.GetRemoteConfigStatus().Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED && bytes.Equal(remoteConfig.ConfigHash, message.RemoteConfigStatus.GetLastRemoteConfigHash()) {
rc.logger.Info("Remote config applied", zap.String("hash", hex.EncodeToString(remoteConfig.ConfigHash)), agentUidField)
agent.lastConfigHash = message.GetRemoteConfigStatus().GetLastRemoteConfigHash()
rc.agentState.Store(agentUid, agent)
} else if !bytes.Equal(agent.lastConfigHash, remoteConfig.ConfigHash) {
rc.logger.Info("Sending new remote configuration", agentUidField, zap.String("hash", hex.EncodeToString(remoteConfig.ConfigHash)))
serverToAgent.RemoteConfig = remoteConfig
}
return &serverToAgent
}