internal/sharedcomponent/sharedcomponent.go (125 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 // This is a copy of the internal module from opentelemetry-collector: // https://github.com/open-telemetry/opentelemetry-collector/blob/main/internal/sharedcomponent // Package sharedcomponent exposes functionality for components // to register against a shared key, such as a configuration object, in order to be reused across signal types. // This is particularly useful when the component relies on a shared resource such as os.File or http.Server. package sharedcomponent // import "github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent" import ( "context" "slices" "sync" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" ) func NewMap[K comparable, V component.Component]() *Map[K, V] { return &Map[K, V]{ components: map[K]*Component[V]{}, } } // Map keeps reference of all created instances for a given shared key such as a component configuration. type Map[K comparable, V component.Component] struct { lock sync.Mutex components map[K]*Component[V] } // LoadOrStore returns the already created instance if exists, otherwise creates a new instance // and adds it to the map of references. func (m *Map[K, V]) LoadOrStore(key K, create func() (V, error)) (*Component[V], error) { m.lock.Lock() defer m.lock.Unlock() if c, ok := m.components[key]; ok { return c, nil } comp, err := create() if err != nil { return nil, err } newComp := &Component[V]{ component: comp, removeFunc: func() { m.lock.Lock() defer m.lock.Unlock() delete(m.components, key) }, } m.components[key] = newComp return newComp, nil } // Component ensures that the wrapped component is started and stopped only once. // When stopped it is removed from the Map. type Component[V component.Component] struct { component V startOnce sync.Once stopOnce sync.Once removeFunc func() hostWrapper *hostWrapper } // Unwrap returns the original component. func (c *Component[V]) Unwrap() V { return c.component } // Start starts the underlying component if it never started before. func (c *Component[V]) Start(ctx context.Context, host component.Host) error { if c.hostWrapper == nil { var err error c.startOnce.Do(func() { c.hostWrapper = &hostWrapper{ host: host, sources: make([]componentstatus.Reporter, 0), previousEvents: make([]*componentstatus.Event, 0), } statusReporter, isStatusReporter := host.(componentstatus.Reporter) if isStatusReporter { c.hostWrapper.addSource(statusReporter) } // It's important that status for a shared component is reported through its // telemetry settings to keep status in sync and avoid race conditions. This logic duplicates // and takes priority over the automated status reporting that happens in graph, making the // status reporting in graph a no-op. c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStarting)) if err = c.component.Start(ctx, c.hostWrapper); err != nil { c.hostWrapper.Report(componentstatus.NewPermanentErrorEvent(err)) } }) return err } statusReporter, isStatusReporter := host.(componentstatus.Reporter) if isStatusReporter { c.hostWrapper.addSource(statusReporter) } return nil } var _ component.Host = (*hostWrapper)(nil) var _ componentstatus.Reporter = (*hostWrapper)(nil) type hostWrapper struct { host component.Host sources []componentstatus.Reporter previousEvents []*componentstatus.Event lock sync.Mutex } func (h *hostWrapper) GetExtensions() map[component.ID]component.Component { return h.host.GetExtensions() } func (h *hostWrapper) Report(e *componentstatus.Event) { // Only remember an event if it will be emitted and it has not been sent already. h.lock.Lock() if len(h.sources) > 0 && !slices.Contains(h.previousEvents, e) { h.previousEvents = append(h.previousEvents, e) } h.lock.Unlock() h.lock.Lock() for _, s := range h.sources { s.Report(e) } h.lock.Unlock() } func (h *hostWrapper) addSource(s componentstatus.Reporter) { h.lock.Lock() for _, e := range h.previousEvents { s.Report(e) } h.lock.Unlock() h.lock.Lock() h.sources = append(h.sources, s) h.lock.Unlock() } // Shutdown shuts down the underlying component. func (c *Component[V]) Shutdown(ctx context.Context) error { var err error c.stopOnce.Do(func() { // It's important that status for a shared component is reported through its // telemetry settings to keep status in sync and avoid race conditions. This logic duplicates // and takes priority over the automated status reporting that happens in graph, making the // status reporting in graph a no-op. if c.hostWrapper != nil { c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStopping)) } err = c.component.Shutdown(ctx) if c.hostWrapper != nil { if err != nil { c.hostWrapper.Report(componentstatus.NewPermanentErrorEvent(err)) } else { c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStopped)) } } c.removeFunc() }) return err }