extension/opampextension/registry.go (146 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension" import ( "container/list" "errors" "fmt" "slices" "sync" "github.com/open-telemetry/opamp-go/protobufs" "go.uber.org/zap" "golang.org/x/exp/maps" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages" ) // customCapabilityClient is a subset of OpAMP client containing only the methods needed for the customCapabilityRegistry. type customCapabilityClient interface { SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) } type customCapabilityRegistry struct { mux *sync.Mutex capabilityToMsgChannels map[string]*list.List client customCapabilityClient logger *zap.Logger } var _ opampcustommessages.CustomCapabilityRegistry = (*customCapabilityRegistry)(nil) func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClient) *customCapabilityRegistry { return &customCapabilityRegistry{ mux: &sync.Mutex{}, capabilityToMsgChannels: make(map[string]*list.List), client: client, logger: logger, } } // Register implements CustomCapabilityRegistry.Register func (cr *customCapabilityRegistry) Register(capability string, opts ...opampcustommessages.CustomCapabilityRegisterOption) (opampcustommessages.CustomCapabilityHandler, error) { optsStruct := opampcustommessages.DefaultCustomCapabilityRegisterOptions() for _, opt := range opts { opt(optsStruct) } cr.mux.Lock() defer cr.mux.Unlock() capabilities := cr.capabilities() if !slices.Contains(capabilities, capability) { capabilities = append(capabilities, capability) } err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{ Capabilities: capabilities, }) if err != nil { return nil, fmt.Errorf("set custom capabilities: %w", err) } capabilityList := cr.capabilityToMsgChannels[capability] if capabilityList == nil { capabilityList = list.New() cr.capabilityToMsgChannels[capability] = capabilityList } msgChan := make(chan *protobufs.CustomMessage, optsStruct.MaxQueuedMessages) callbackElem := capabilityList.PushBack(msgChan) unregisterFunc := cr.removeCapabilityFunc(capability, callbackElem) sender := newCustomMessageHandler(cr, cr.client, capability, msgChan, unregisterFunc) return sender, nil } // ProcessMessage processes a custom message, asynchronously broadcasting it to all registered capability handlers for // the messages capability. func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) { cr.mux.Lock() defer cr.mux.Unlock() msgChannels, ok := cr.capabilityToMsgChannels[cm.Capability] if !ok { return } for node := msgChannels.Front(); node != nil; node = node.Next() { msgChan, ok := node.Value.(chan *protobufs.CustomMessage) if !ok { continue } // If the channel is full, we will skip sending the message to the receiver. // We do this because we don't want a misbehaving component to be able to // block the opamp extension, or block other components from receiving messages. select { case msgChan <- cm: default: } } } // removeCapabilityFunc returns a func that removes the custom capability with the given msg channel list element and sender, // then recalculates and sets the list of custom capabilities on the OpAMP client. func (cr *customCapabilityRegistry) removeCapabilityFunc(capability string, callbackElement *list.Element) func() { return func() { cr.mux.Lock() defer cr.mux.Unlock() msgChanList := cr.capabilityToMsgChannels[capability] msgChanList.Remove(callbackElement) if msgChanList.Front() == nil { // Since there are no more callbacks for this capability, // this capability is no longer supported delete(cr.capabilityToMsgChannels, capability) } capabilities := cr.capabilities() err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{ Capabilities: capabilities, }) if err != nil { // It's OK if we couldn't actually remove the capability, it just means we won't // notify the server properly, and the server may send us messages that we have no associated callbacks for. cr.logger.Error("Failed to set new capabilities", zap.Error(err)) } } } // capabilities gives the current set of custom capabilities with at least one // callback registered. func (cr *customCapabilityRegistry) capabilities() []string { return maps.Keys(cr.capabilityToMsgChannels) } type customMessageHandler struct { // unregisteredMux protects unregistered, and makes sure that a message cannot be sent // on an unregistered capability. unregisteredMux *sync.Mutex capability string opampClient customCapabilityClient registry *customCapabilityRegistry sendChan <-chan *protobufs.CustomMessage unregisterCapabilityFunc func() unregistered bool } var _ opampcustommessages.CustomCapabilityHandler = (*customMessageHandler)(nil) func newCustomMessageHandler( registry *customCapabilityRegistry, opampClient customCapabilityClient, capability string, sendChan <-chan *protobufs.CustomMessage, unregisterCapabilityFunc func(), ) *customMessageHandler { return &customMessageHandler{ unregisteredMux: &sync.Mutex{}, capability: capability, opampClient: opampClient, registry: registry, sendChan: sendChan, unregisterCapabilityFunc: unregisterCapabilityFunc, } } // Message implements CustomCapabilityHandler.Message func (c *customMessageHandler) Message() <-chan *protobufs.CustomMessage { return c.sendChan } // SendMessage implements CustomCapabilityHandler.SendMessage func (c *customMessageHandler) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() if c.unregistered { return nil, errors.New("capability has already been unregistered") } cm := &protobufs.CustomMessage{ Capability: c.capability, Type: messageType, Data: message, } return c.opampClient.SendCustomMessage(cm) } // Unregister implements CustomCapabilityHandler.Unregister func (c *customMessageHandler) Unregister() { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() c.unregistered = true c.unregisterCapabilityFunc() }