pkg/status/aggregator.go (171 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package status // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
import (
"container/list"
"fmt"
"strings"
"sync"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/pipeline"
)
// Extensions are treated as a pseudo pipeline and extsID is used as a map key
var extsID = pipeline.MustNewID("extensions")
// extensionIDIter is an iterator that is substituted for AllPipelineIDs for
// the extensions pseudo pipeline.
func extensionIDIter(f func(pipeline.ID) bool) {
_ = f(extsID)
}
// Note: this interface had to be introduced because we need to be able to rewrite the
// timestamps of some events during aggregation. The implementation in core doesn't currently
// allow this, but this interface provides a workaround.
type Event interface {
Status() componentstatus.Status
Err() error
Timestamp() time.Time
}
// Scope refers to a part of an AggregateStatus. The zero-value, aka ScopeAll,
// refers to the entire AggregateStatus. ScopeExtensions refers to the extensions
// subtree, and any other value refers to a pipeline subtree.
type Scope string
const (
ScopeAll Scope = ""
ScopeExtensions Scope = "extensions"
pipelinePrefix string = "pipeline:"
)
func (s Scope) toKey() string {
if s == ScopeAll || s == ScopeExtensions {
return string(s)
}
return pipelinePrefix + string(s)
}
type Verbosity bool
const (
Verbose Verbosity = true
Concise = false
)
// AggregateStatus contains a map of child AggregateStatuses and an embedded Event.
// It can be used to represent a single, top-level status when the ComponentStatusMap
// is empty, or a nested structure when map is non-empty.
type AggregateStatus struct {
Event
ComponentStatusMap map[string]*AggregateStatus
}
func (a *AggregateStatus) clone(verbosity Verbosity) *AggregateStatus {
st := &AggregateStatus{
Event: a.Event,
}
if verbosity == Verbose && len(a.ComponentStatusMap) > 0 {
st.ComponentStatusMap = make(map[string]*AggregateStatus, len(a.ComponentStatusMap))
for k, cs := range a.ComponentStatusMap {
st.ComponentStatusMap[k] = cs.clone(verbosity)
}
}
return st
}
type subscription struct {
statusCh chan *AggregateStatus
verbosity Verbosity
}
// UnsubscribeFunc is a function used to unsubscribe from a stream.
type UnsubscribeFunc func()
// Aggregator records individual status events for components and aggregates statuses for the
// pipelines they belong to and the collector overall.
type Aggregator struct {
// mu protects aggregateStatus and subscriptions from concurrent modification
mu sync.RWMutex
aggregateStatus *AggregateStatus
subscriptions map[string]*list.List
aggregationFunc aggregationFunc
}
// NewAggregator returns a *status.Aggregator.
func NewAggregator(errPriority ErrorPriority) *Aggregator {
return &Aggregator{
aggregateStatus: &AggregateStatus{
Event: &componentstatus.Event{},
ComponentStatusMap: make(map[string]*AggregateStatus),
},
subscriptions: make(map[string]*list.List),
aggregationFunc: newAggregationFunc(errPriority),
}
}
// AggregateStatus returns an *AggregateStatus for the given scope. The scope can be the collector
// overall (ScopeAll), extensions (ScopeExtensions), or a pipeline by name. Detail specifies whether
// or not subtrees should be returned with the *AggregateStatus. The boolean return value indicates
// whether or not the scope was found.
func (a *Aggregator) AggregateStatus(scope Scope, verbosity Verbosity) (*AggregateStatus, bool) {
a.mu.RLock()
defer a.mu.RUnlock()
if scope == ScopeAll {
return a.aggregateStatus.clone(verbosity), true
}
st, ok := a.aggregateStatus.ComponentStatusMap[scope.toKey()]
if !ok {
return nil, false
}
return st.clone(verbosity), true
}
// RecordStatus stores and aggregates a StatusEvent for the given component instance.
func (a *Aggregator) RecordStatus(source *componentstatus.InstanceID, event *componentstatus.Event) {
allPipelineIDs := source.AllPipelineIDs
// extensions are treated as a pseudo-pipeline
if source.Kind() == component.KindExtension {
allPipelineIDs = extensionIDIter
}
a.mu.Lock()
defer a.mu.Unlock()
allPipelineIDs(func(compID pipeline.ID) bool {
var pipelineStatus *AggregateStatus
pipelineScope := Scope(compID.String())
pipelineKey := pipelineScope.toKey()
pipelineStatus, ok := a.aggregateStatus.ComponentStatusMap[pipelineKey]
if !ok {
pipelineStatus = &AggregateStatus{
ComponentStatusMap: make(map[string]*AggregateStatus),
}
}
componentKey := fmt.Sprintf("%s:%s", strings.ToLower(source.Kind().String()), source.ComponentID())
pipelineStatus.ComponentStatusMap[componentKey] = &AggregateStatus{
Event: event,
}
a.aggregateStatus.ComponentStatusMap[pipelineKey] = pipelineStatus
pipelineStatus.Event = a.aggregationFunc(pipelineStatus)
a.notifySubscribers(pipelineScope, pipelineStatus)
return true
})
a.aggregateStatus.Event = a.aggregationFunc(a.aggregateStatus)
a.notifySubscribers(ScopeAll, a.aggregateStatus)
}
// Subscribe allows you to subscribe to a stream of events for the given scope. The scope can be
// the collector overall (ScopeAll), extensions (ScopeExtensions), or a pipeline name.
// It is possible to subscribe to a pipeline that has not yet reported. An initial nil
// will be sent on the channel and events will start streaming if and when it starts reporting.
// A `Verbose` verbosity specifies that subtrees should be returned with the *AggregateStatus.
// To unsubscribe, call the returned UnsubscribeFunc.
func (a *Aggregator) Subscribe(scope Scope, verbosity Verbosity) (<-chan *AggregateStatus, UnsubscribeFunc) {
a.mu.Lock()
defer a.mu.Unlock()
key := scope.toKey()
st := a.aggregateStatus
if scope != ScopeAll {
st = st.ComponentStatusMap[key]
}
if st != nil {
st = st.clone(verbosity)
}
sub := &subscription{
statusCh: make(chan *AggregateStatus, 1),
verbosity: verbosity,
}
subList, ok := a.subscriptions[key]
if !ok {
subList = list.New()
a.subscriptions[key] = subList
}
el := subList.PushBack(sub)
unsubFunc := func() {
a.mu.Lock()
defer a.mu.Unlock()
subList.Remove(el)
if subList.Front() == nil {
delete(a.subscriptions, key)
}
}
sub.statusCh <- st
return sub.statusCh, unsubFunc
}
// Close terminates all existing subscriptions.
func (a *Aggregator) Close() {
a.mu.Lock()
defer a.mu.Unlock()
for _, subList := range a.subscriptions {
for el := subList.Front(); el != nil; el = el.Next() {
sub := el.Value.(*subscription)
close(sub.statusCh)
}
}
}
func (a *Aggregator) notifySubscribers(scope Scope, status *AggregateStatus) {
subList, ok := a.subscriptions[scope.toKey()]
if !ok {
return
}
for el := subList.Front(); el != nil; el = el.Next() {
sub := el.Value.(*subscription)
// clear unread events
select {
case <-sub.statusCh:
default:
}
sub.statusCh <- status.clone(sub.verbosity)
}
}