pkg/controller/elasticsearch/observer/manager.go (151 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package observer
import (
"context"
"sync"
"time"
"go.elastic.co/apm/v2"
"k8s.io/apimachinery/pkg/types"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/annotation"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
)
const (
// ObserverIntervalAnnotation is the name of the annotation used to set the observation interval for a cluster.
ObserverIntervalAnnotation = "eck.k8s.elastic.co/es-observer-interval"
)
// Manager for a set of observers
type Manager struct {
defaultInterval time.Duration
observerLock sync.RWMutex
observers map[types.NamespacedName]*Observer
listenerLock sync.RWMutex
listeners []OnObservation // invoked on each observation event
tracer *apm.Tracer
}
// NewManager returns a new manager
func NewManager(defaultInterval time.Duration, tracer *apm.Tracer) *Manager {
return &Manager{
defaultInterval: defaultInterval,
observers: make(map[types.NamespacedName]*Observer),
tracer: tracer,
}
}
// ObservedStateResolver returns a function that returns the last known state of the given cluster,
// as expected by the main reconciliation driver
func (m *Manager) ObservedStateResolver(
ctx context.Context,
cluster esv1.Elasticsearch,
esClientProvider func(client.Client) client.Client,
isServiceReady bool,
) func() esv1.ElasticsearchHealth {
observer := m.Observe(ctx, cluster, esClientProvider, isServiceReady)
return func() esv1.ElasticsearchHealth {
return observer.LastHealth()
}
}
func (m *Manager) getObserver(key types.NamespacedName) (*Observer, bool) {
m.observerLock.RLock()
defer m.observerLock.RUnlock()
observer, ok := m.observers[key]
return observer, ok
}
// Observe gets or create a cluster state observer for the given cluster
// In case something has changed in the given esClient (eg. different caCert), the observer is recreated accordingly
func (m *Manager) Observe(ctx context.Context, cluster esv1.Elasticsearch, esClientProvider func(client.Client) client.Client, isServiceReady bool) *Observer {
defer tracing.Span(&ctx)()
nsName := k8s.ExtractNamespacedName(&cluster)
settings := m.extractObserverSettings(ctx, cluster)
observer, exists := m.getObserver(nsName)
var esClient client.Client
if exists {
esClient = esClientProvider(observer.esClient)
} else {
esClient = esClientProvider(nil)
}
switch {
case !exists:
// This Elasticsearch resource has not being observed yet, create the observer and maybe do a first observation.
observer = m.createOrReplaceObserver(ctx, nsName, settings, esClient)
case exists && (!observer.esClient.Equal(esClient) || observer.settings != settings):
// This Elasticsearch resource is already being observed asynchronously, no need to do a first observation.
observer = m.createOrReplaceObserver(ctx, nsName, settings, esClient)
case exists && settings.ObservationInterval <= 0:
// in case asynchronous observation has been disabled ensure at least one observation at reconciliation time.
return m.getAndObserveSynchronously(ctx, nsName)
default:
// No change, return the existing observer.
return observer
}
if !exists && isServiceReady {
// there was no existing observer and Service is ready: let's try an initial synchronous observation
observer.observe(ctx)
}
// start the new observer
observer.Start()
return observer
}
// extractObserverSettings extracts observer settings from the annotations on the Elasticsearch resource.
func (m *Manager) extractObserverSettings(ctx context.Context, cluster esv1.Elasticsearch) Settings {
return Settings{
ObservationInterval: annotation.ExtractTimeout(ctx, cluster.ObjectMeta, ObserverIntervalAnnotation, m.defaultInterval),
Tracer: m.tracer,
}
}
// createOrReplaceObserver creates a new observer and adds it to the observers map, replacing existing observers if necessary.
// The new observer is not started, it is up to the caller to invoke observer.Start(ctx)
func (m *Manager) createOrReplaceObserver(ctx context.Context, cluster types.NamespacedName, settings Settings, esClient client.Client) *Observer {
defer tracing.Span(&ctx)()
m.observerLock.Lock()
defer m.observerLock.Unlock()
observer, exists := m.observers[cluster]
if exists {
log.Info("Replacing observer", "namespace", cluster.Namespace, "es_name", cluster.Name)
observer.Stop()
delete(m.observers, cluster)
}
observer = NewObserver(cluster, esClient, settings, m.notifyListeners)
m.observers[cluster] = observer
return observer
}
// getAndObserveSynchronously retrieves the currently configured observer and trigger a synchronous observation.
func (m *Manager) getAndObserveSynchronously(ctx context.Context, cluster types.NamespacedName) *Observer {
defer tracing.Span(&ctx)()
m.observerLock.RLock()
defer m.observerLock.RUnlock()
// invariant: this method must only be called when existence of observer is given
observer := m.observers[cluster]
// force a synchronous observation
observer.observe(ctx)
return observer
}
// List returns the names of clusters currently observed
func (m *Manager) List() []types.NamespacedName {
m.observerLock.RLock()
defer m.observerLock.RUnlock()
names := make([]types.NamespacedName, len(m.observers))
i := 0
for name := range m.observers {
names[i] = name
i++
}
return names
}
// AddObservationListener adds the given listener to the list of listeners notified
// on every observation.
func (m *Manager) AddObservationListener(listener OnObservation) {
m.listenerLock.Lock()
defer m.listenerLock.Unlock()
m.listeners = append(m.listeners, listener)
}
// notifyListeners notifies all listeners that an observation occurred.
func (m *Manager) notifyListeners(cluster types.NamespacedName, previousState, newState esv1.ElasticsearchHealth) {
m.listenerLock.RLock()
switch len(m.listeners) {
case 0:
m.listenerLock.RUnlock()
return
case 1:
m.listeners[0](cluster, previousState, newState)
m.listenerLock.RUnlock()
return
default:
var wg sync.WaitGroup
for _, l := range m.listeners {
wg.Add(1)
go func(f OnObservation) {
f(cluster, previousState, newState)
wg.Done()
}(l)
}
m.listenerLock.RUnlock()
wg.Wait()
}
}
func (m *Manager) StopObserving(key types.NamespacedName) {
log.Info("Stopping observer", "namespace", key.Namespace, "es_name", key.Name)
m.observerLock.Lock()
defer m.observerLock.Unlock()
if observer, ok := m.observers[key]; ok {
observer.Stop()
delete(m.observers, key)
}
}