internal/pkg/monitor/subscription_monitor.go (120 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package monitor
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
"github.com/rs/zerolog"
"github.com/elastic/go-elasticsearch/v8"
"golang.org/x/sync/errgroup"
)
const (
defaultSubscriptionTimeout = 60 * time.Second // max amount of time subscription has to read from channel
)
var gCounter uint64
// Subscription is a subscription to get notified for new documents.
type Subscription interface {
// Output is the channel the monitor send new documents to
Output() <-chan []es.HitT
}
// Monitor monitors for new documents in an index and sends them to its subscriptions.
type Monitor interface {
// The BaseMonitor methods
BaseMonitor
// Subscribe to get notified of documents
Subscribe() Subscription
// Unsubscribe from getting notifications on documents
Unsubscribe(sub Subscription)
}
// subT is a subscription to get notified for new documents.
type subT struct {
idx uint64
c chan []es.HitT
}
// Output returns the subscription channel.
func (s *subT) Output() <-chan []es.HitT {
return s.c
}
// monitorT monitors for new documents in an index.
type monitorT struct {
sm SimpleMonitor
mut sync.RWMutex
subs map[uint64]*subT
subTimeout time.Duration
}
// New creates new subscription monitor.
func New(index string, esCli, monCli *elasticsearch.Client, opts ...Option) (Monitor, error) {
sm, err := NewSimple(index, esCli, monCli, opts...)
if err != nil {
return nil, err
}
m := &monitorT{
sm: sm,
subs: make(map[uint64]*subT),
subTimeout: defaultSubscriptionTimeout,
}
return m, nil
}
// GetCheckpoint implements the GlobalCheckpointProvider interface.
func (m *monitorT) GetCheckpoint() sqn.SeqNo {
return m.sm.GetCheckpoint()
}
// Subscribe returns a Subscription that is used to get notified of documents.
func (m *monitorT) Subscribe() Subscription {
idx := atomic.AddUint64(&gCounter, 1)
s := &subT{
idx: idx,
c: make(chan []es.HitT, 1),
}
m.mut.Lock()
m.subs[idx] = s
m.mut.Unlock()
return s
}
// Unsubscribe removes a subscription from the Monitor.
// The Subscription channel is not closed by Unsubscribe.
func (m *monitorT) Unsubscribe(sub Subscription) {
s, ok := sub.(*subT)
if !ok {
return
}
m.mut.Lock()
_, ok = m.subs[s.idx]
if ok {
delete(m.subs, s.idx)
}
m.mut.Unlock()
}
// Run starts the Monitor.
func (m *monitorT) Run(ctx context.Context) (err error) {
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
return m.sm.Run(gctx)
})
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case hits := <-m.sm.Output():
m.notify(ctx, hits)
}
}
return g.Wait()
}
func (m *monitorT) notify(ctx context.Context, hits []es.HitT) {
sz := len(hits)
if sz > 0 {
m.mut.RLock()
var wg sync.WaitGroup
wg.Add(len(m.subs))
for _, s := range m.subs {
go func(s *subT) {
defer wg.Done()
lc, cn := context.WithTimeout(ctx, m.subTimeout)
defer cn()
select {
case s.c <- hits:
zerolog.Ctx(ctx).Info().
Str("ctx", "subscription monitor").
Msg("received notification")
case <-lc.Done():
zerolog.Ctx(ctx).Error().
Err(lc.Err()).
Str("ctx", "subscription monitor").
Dur("timeout", m.subTimeout).
Msg("dropped notification")
}
}(s)
}
m.mut.RUnlock()
wg.Wait()
}
}