internal/sharedcomponent/sharedcomponent.go (125 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// This is a copy of the internal module from opentelemetry-collector:
// https://github.com/open-telemetry/opentelemetry-collector/blob/main/internal/sharedcomponent
// Package sharedcomponent exposes functionality for components
// to register against a shared key, such as a configuration object, in order to be reused across signal types.
// This is particularly useful when the component relies on a shared resource such as os.File or http.Server.
package sharedcomponent // import "github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent"
import (
"context"
"slices"
"sync"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
)
func NewMap[K comparable, V component.Component]() *Map[K, V] {
return &Map[K, V]{
components: map[K]*Component[V]{},
}
}
// Map keeps reference of all created instances for a given shared key such as a component configuration.
type Map[K comparable, V component.Component] struct {
lock sync.Mutex
components map[K]*Component[V]
}
// LoadOrStore returns the already created instance if exists, otherwise creates a new instance
// and adds it to the map of references.
func (m *Map[K, V]) LoadOrStore(key K, create func() (V, error)) (*Component[V], error) {
m.lock.Lock()
defer m.lock.Unlock()
if c, ok := m.components[key]; ok {
return c, nil
}
comp, err := create()
if err != nil {
return nil, err
}
newComp := &Component[V]{
component: comp,
removeFunc: func() {
m.lock.Lock()
defer m.lock.Unlock()
delete(m.components, key)
},
}
m.components[key] = newComp
return newComp, nil
}
// Component ensures that the wrapped component is started and stopped only once.
// When stopped it is removed from the Map.
type Component[V component.Component] struct {
component V
startOnce sync.Once
stopOnce sync.Once
removeFunc func()
hostWrapper *hostWrapper
}
// Unwrap returns the original component.
func (c *Component[V]) Unwrap() V {
return c.component
}
// Start starts the underlying component if it never started before.
func (c *Component[V]) Start(ctx context.Context, host component.Host) error {
if c.hostWrapper == nil {
var err error
c.startOnce.Do(func() {
c.hostWrapper = &hostWrapper{
host: host,
sources: make([]componentstatus.Reporter, 0),
previousEvents: make([]*componentstatus.Event, 0),
}
statusReporter, isStatusReporter := host.(componentstatus.Reporter)
if isStatusReporter {
c.hostWrapper.addSource(statusReporter)
}
// It's important that status for a shared component is reported through its
// telemetry settings to keep status in sync and avoid race conditions. This logic duplicates
// and takes priority over the automated status reporting that happens in graph, making the
// status reporting in graph a no-op.
c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStarting))
if err = c.component.Start(ctx, c.hostWrapper); err != nil {
c.hostWrapper.Report(componentstatus.NewPermanentErrorEvent(err))
}
})
return err
}
statusReporter, isStatusReporter := host.(componentstatus.Reporter)
if isStatusReporter {
c.hostWrapper.addSource(statusReporter)
}
return nil
}
var _ component.Host = (*hostWrapper)(nil)
var _ componentstatus.Reporter = (*hostWrapper)(nil)
type hostWrapper struct {
host component.Host
sources []componentstatus.Reporter
previousEvents []*componentstatus.Event
lock sync.Mutex
}
func (h *hostWrapper) GetExtensions() map[component.ID]component.Component {
return h.host.GetExtensions()
}
func (h *hostWrapper) Report(e *componentstatus.Event) {
// Only remember an event if it will be emitted and it has not been sent already.
h.lock.Lock()
if len(h.sources) > 0 && !slices.Contains(h.previousEvents, e) {
h.previousEvents = append(h.previousEvents, e)
}
h.lock.Unlock()
h.lock.Lock()
for _, s := range h.sources {
s.Report(e)
}
h.lock.Unlock()
}
func (h *hostWrapper) addSource(s componentstatus.Reporter) {
h.lock.Lock()
for _, e := range h.previousEvents {
s.Report(e)
}
h.lock.Unlock()
h.lock.Lock()
h.sources = append(h.sources, s)
h.lock.Unlock()
}
// Shutdown shuts down the underlying component.
func (c *Component[V]) Shutdown(ctx context.Context) error {
var err error
c.stopOnce.Do(func() {
// It's important that status for a shared component is reported through its
// telemetry settings to keep status in sync and avoid race conditions. This logic duplicates
// and takes priority over the automated status reporting that happens in graph, making the
// status reporting in graph a no-op.
if c.hostWrapper != nil {
c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStopping))
}
err = c.component.Shutdown(ctx)
if c.hostWrapper != nil {
if err != nil {
c.hostWrapper.Report(componentstatus.NewPermanentErrorEvent(err))
} else {
c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStopped))
}
}
c.removeFunc()
})
return err
}