extension/observer/endpointswatcher/endpointswatcher.go (165 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package endpointswatcher // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/endpointswatcher" import ( "encoding/json" "reflect" "sync" "time" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" ) var _ observer.Observable = (*EndpointsWatcher)(nil) // EndpointsWatcher provides a generic mechanism to run EndpointsLister.ListEndpoints every // RefreshInterval and report any new or removed endpoints to Notify instances registered // via ListAndWatch. Any observer that lists endpoints can make use of EndpointsWatcher // to poll for endpoints by embedding this struct and using NewEndpointsWatcher(). type EndpointsWatcher struct { EndpointsLister EndpointsLister RefreshInterval time.Duration // subscribed Notify instances ~sync.Map(map[NotifyID]Notify) toNotify sync.Map // map of NotifyID to known endpoints for that Notify (subscriptions can occur at different times in service startup). // ~sync.Map(map[NotifyID]map[EndpointID]Endpoint) existingEndpoints sync.Map stop chan struct{} once *sync.Once logger *zap.Logger } func New(endpointsLister EndpointsLister, refreshInterval time.Duration, logger *zap.Logger) *EndpointsWatcher { return &EndpointsWatcher{ EndpointsLister: endpointsLister, RefreshInterval: refreshInterval, existingEndpoints: sync.Map{}, toNotify: sync.Map{}, stop: make(chan struct{}), once: &sync.Once{}, logger: logger, } } // ListAndWatch runs EndpointsLister.ListEndpoints() on a regular interval and keeps track of the results // for alerting all subscribed Notify's of the based on the differences from the previous call. func (ew *EndpointsWatcher) ListAndWatch(notify observer.Notify) { ew.once.Do(func() { go func() { ticker := time.NewTicker(ew.RefreshInterval) defer ticker.Stop() for { select { case <-ew.stop: return case <-ticker.C: var toNotify []observer.NotifyID ew.toNotify.Range(func(notifyID, _ any) bool { toNotify = append(toNotify, notifyID.(observer.NotifyID)) return true }) ew.notifyOfLatestEndpoints(toNotify...) } } }() }) ew.toNotify.Store(notify.ID(), notify) ew.notifyOfLatestEndpoints(notify.ID()) } func (ew *EndpointsWatcher) Unsubscribe(notify observer.Notify) { ew.toNotify.Delete(notify.ID()) ew.existingEndpoints.Delete(notify.ID()) } // notifyOfLatestEndpoints alerts subscribed Notify instances by their NotifyID of latest Endpoint events, // updating their internal store with results of ListEndpoints() call. func (ew *EndpointsWatcher) notifyOfLatestEndpoints(notifyIDs ...observer.NotifyID) { latestEndpoints := ew.EndpointsLister.ListEndpoints() wg := &sync.WaitGroup{} for _, notifyID := range notifyIDs { var notify observer.Notify if n, ok := ew.toNotify.Load(notifyID); !ok { // an Unsubscribe() must have occurred during this call ew.logger.Debug("notifyOfEndpoints() ignoring instruction to notify non-subscribed Notify", zap.Any("notify", notifyID)) continue } else if notify, ok = n.(observer.Notify); !ok { ew.logger.Warn("failed to obtain notify instance from EndpointsWatcher", zap.Any("notify", n)) continue } wg.Add(1) go ew.updateAndNotifyOfEndpoints(notify, latestEndpoints, wg) } wg.Wait() } func (ew *EndpointsWatcher) updateAndNotifyOfEndpoints(notify observer.Notify, endpoints []observer.Endpoint, done *sync.WaitGroup) { defer done.Done() removedEndpoints, addedEndpoints, changedEndpoints := ew.updateEndpoints(notify, endpoints) if len(removedEndpoints) > 0 { ew.logEndpointEvent("removed endpoints", notify, removedEndpoints) notify.OnRemove(removedEndpoints) } if len(addedEndpoints) > 0 { ew.logEndpointEvent("added endpoints", notify, addedEndpoints) notify.OnAdd(addedEndpoints) } if len(changedEndpoints) > 0 { ew.logEndpointEvent("changed endpoints", notify, changedEndpoints) notify.OnChange(changedEndpoints) } } func (ew *EndpointsWatcher) updateEndpoints(notify observer.Notify, endpoints []observer.Endpoint) (removed, added, changed []observer.Endpoint) { notifyID := notify.ID() // Create map from ID to endpoint for lookup. endpointsMap := make(map[observer.EndpointID]struct{}, len(endpoints)) for _, e := range endpoints { endpointsMap[e.ID] = struct{}{} } le, _ := ew.existingEndpoints.LoadOrStore(notifyID, map[observer.EndpointID]observer.Endpoint{}) var storedEndpoints map[observer.EndpointID]observer.Endpoint var ok bool if storedEndpoints, ok = le.(map[observer.EndpointID]observer.Endpoint); !ok { ew.logger.Warn("failed to load Endpoint store from EndpointsWatcher", zap.Any("endpoints", le)) return } // copy to not modify sync.Map value directly (will be reloaded) existingEndpoints := map[observer.EndpointID]observer.Endpoint{} for id, endpoint := range storedEndpoints { existingEndpoints[id] = endpoint } // Iterate over the latest endpoints obtained. An endpoint needs // to be added or updated in case it is not already available in existingEndpoints or doesn't match // the latest value. for _, e := range endpoints { var existingEndpoint observer.Endpoint if existingEndpoint, ok = existingEndpoints[e.ID]; !ok { existingEndpoints[e.ID] = e added = append(added, e) } else if !endpointsEqual(e, existingEndpoint) { // Collect updated endpoints. existingEndpoints[e.ID] = e changed = append(changed, e) } } // If endpoint present in existingEndpoints does not exist in the latest // list, it needs to be removed. for id, e := range existingEndpoints { if _, ok = endpointsMap[e.ID]; !ok { delete(existingEndpoints, id) removed = append(removed, e) } } ew.existingEndpoints.Store(notifyID, existingEndpoints) return } // StopListAndWatch polling the ListEndpoints. func (ew *EndpointsWatcher) StopListAndWatch() { if ew.stop != nil { close(ew.stop) } } // EndpointsLister that provides a list of endpoints. type EndpointsLister interface { // ListEndpoints provides a list of endpoints and is expected to be // implemented by an observer looking for endpoints. ListEndpoints() []observer.Endpoint } func (ew *EndpointsWatcher) logEndpointEvent(msg string, notify observer.Notify, endpoints []observer.Endpoint) { if ce := ew.logger.Check(zap.DebugLevel, msg); ce != nil { fields := []zap.Field{zap.Any("notify", notify.ID())} for _, endpoint := range endpoints { if env, err := endpoint.Env(); err == nil { if marshaled, e := json.Marshal(env); e == nil { fields = append(fields, zap.String(string(endpoint.ID), string(marshaled))) } } } ce.Write(fields...) } } func endpointsEqual(lhs, rhs observer.Endpoint) bool { switch { case lhs.ID != rhs.ID: return false case lhs.Target != rhs.Target: return false case lhs.Details == nil && rhs.Details != nil: return false case rhs.Details == nil && lhs.Details != nil: return false case lhs.Details == nil && rhs.Details == nil: return true case lhs.Details.Type() != rhs.Details.Type(): return false default: return reflect.DeepEqual(lhs.Details.Env(), rhs.Details.Env()) } }