extension/apmconfigextension/opamp_callbacks.go (101 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apmconfigextension // import "github.com/elastic/opentelemetry-collector-components/extension/apmconfigextension"
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"net/http"
"sync"
"github.com/elastic/opentelemetry-collector-components/extension/apmconfigextension/apmconfig"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types"
"go.uber.org/zap"
)
type remoteConfigCallbacks struct {
*types.Callbacks
configClient apmconfig.RemoteConfigClient
agentState sync.Map
logger *zap.Logger
}
type agentInfo struct {
agentUid apmconfig.InstanceUid
identifyingAttributes apmconfig.IdentifyingAttributes
lastConfigHash []byte
}
func newRemoteConfigCallbacks(configClient apmconfig.RemoteConfigClient, logger *zap.Logger) *remoteConfigCallbacks {
opampCallbacks := &remoteConfigCallbacks{
configClient: configClient,
agentState: sync.Map{},
logger: logger,
}
connectionCallbacks := types.ConnectionCallbacks{}
connectionCallbacks.SetDefaults()
connectionCallbacks.OnMessage = opampCallbacks.onMessage
opampCallbacks.Callbacks = &types.Callbacks{
OnConnecting: func(request *http.Request) types.ConnectionResponse {
return types.ConnectionResponse{
Accept: true,
HTTPStatusCode: 200,
ConnectionCallbacks: connectionCallbacks,
}
},
}
return opampCallbacks
}
func (rc *remoteConfigCallbacks) serverError(msg string, message *protobufs.ServerToAgent, logFields ...zap.Field) *protobufs.ServerToAgent {
message.ErrorResponse = &protobufs.ServerErrorResponse{
ErrorMessage: msg,
Type: protobufs.ServerErrorResponseType_ServerErrorResponseType_Unknown,
}
rc.logger.Error(message.ErrorResponse.ErrorMessage, logFields...)
return message
}
// OnMessage is called when a message is received from the connection. Can happen
// only after OnConnected(). Must return a ServerToAgent message that will be sent
// as a response to the Agent.
// For plain HTTP requests once OnMessage returns and the response is sent
// to the Agent the OnConnectionClose message will be called immediately.
func (rc *remoteConfigCallbacks) onMessage(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
serverToAgent := protobufs.ServerToAgent{}
serverToAgent.Capabilities = uint64(protobufs.ServerCapabilities_ServerCapabilities_OffersRemoteConfig)
serverToAgent.InstanceUid = message.GetInstanceUid()
if message.GetInstanceUid() == nil {
serverToAgent.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState)
return rc.serverError("instance_uid must be provided", &serverToAgent)
}
agentUid := hex.EncodeToString(message.GetInstanceUid())
if message.GetAgentDescription() != nil {
// new description might lead to another remote configuration
rc.agentState.Store(agentUid, agentInfo{
agentUid: message.GetInstanceUid(),
identifyingAttributes: message.AgentDescription.IdentifyingAttributes,
})
}
agentUidField := zap.String("instance_uid", agentUid)
if message.GetAgentDisconnect() != nil {
rc.logger.Info("Disconnecting the agent from the remote configuration service", agentUidField)
rc.agentState.Delete(agentUid)
return &serverToAgent
}
loadedAgent, _ := rc.agentState.LoadOrStore(agentUid, agentInfo{
agentUid: message.GetInstanceUid(),
})
agent, ok := loadedAgent.(agentInfo)
if !ok {
rc.logger.Warn("unexpected type in agentState cache", agentUidField)
return rc.serverError("internal error: invalid agent state", &serverToAgent)
}
remoteConfig, err := rc.configClient.RemoteConfig(ctx, agent.agentUid, agent.identifyingAttributes)
if err != nil {
// remote config client could not identify the agent
if errors.Is(err, apmconfig.UnidentifiedAgent) {
serverToAgent.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState)
}
return rc.serverError(fmt.Sprintf("error retrieving remote configuration: %s", err), &serverToAgent)
} else if remoteConfig == nil {
// nothing to be applied
return &serverToAgent
}
if message.GetRemoteConfigStatus() != nil && message.GetRemoteConfigStatus().Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED && bytes.Equal(remoteConfig.ConfigHash, message.RemoteConfigStatus.GetLastRemoteConfigHash()) {
rc.logger.Info("Remote config applied", zap.String("hash", hex.EncodeToString(remoteConfig.ConfigHash)), agentUidField)
agent.lastConfigHash = message.GetRemoteConfigStatus().GetLastRemoteConfigHash()
rc.agentState.Store(agentUid, agent)
} else if !bytes.Equal(agent.lastConfigHash, remoteConfig.ConfigHash) {
rc.logger.Info("Sending new remote configuration", agentUidField, zap.String("hash", hex.EncodeToString(remoteConfig.ConfigHash)))
serverToAgent.RemoteConfig = remoteConfig
}
return &serverToAgent
}