notifications/sinks.go (431 lines of code) (raw):
package notifications
import (
"container/list"
"errors"
"fmt"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/hashicorp/go-multierror"
log "github.com/sirupsen/logrus"
)
const DefaultBroadcasterFanoutTimeout = 15 * time.Second
// NOTE(stevvooe): This file contains definitions for several utility sinks.
// Typically, the broadcaster is the only sink that should be required
// externally, but others are suitable for export if the need arises. Albeit,
// the tight integration with endpoint metrics should be removed.
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
// component is to dispatch events to configured endpoints. Reliability can be
// provided by wrapping incoming sinks.
type Broadcaster struct {
sinks []Sink
eventsCh chan *Event
doneCh chan struct{}
fanoutTimeout time.Duration
wg *sync.WaitGroup
}
// NewBroadcaster ...
// Add appends one or more sinks to the list of sinks. The broadcaster
// behavior will be affected by the properties of the sink. Generally, the
// sink should accept all messages and deal with reliability on its own. Use
// of EventQueue and RetryingSink should be used here.
func NewBroadcaster(fanoutTimeout time.Duration, sinks ...Sink) *Broadcaster {
if fanoutTimeout == 0 {
fanoutTimeout = DefaultBroadcasterFanoutTimeout
}
b := Broadcaster{
sinks: sinks,
eventsCh: make(chan *Event),
doneCh: make(chan struct{}),
fanoutTimeout: fanoutTimeout,
wg: new(sync.WaitGroup),
}
// Start the broadcaster
b.wg.Add(1)
go b.run()
return &b
}
// Write accepts an event to be dispatched to all sinks. This method
// will never fail and should never block (hopefully!). The caller cedes the
// slice memory to the broadcaster and should not modify it after calling
// write.
func (b *Broadcaster) Write(event *Event) error {
// NOTE(prozlach): avoid a racy situation when both channels are "ready",
// and make sure that closing the Sink takes priority:
select {
case <-b.doneCh:
return ErrSinkClosed
default:
select {
case b.eventsCh <- event:
case <-b.doneCh:
return ErrSinkClosed
}
}
return nil
}
// Close the broadcaster, ensuring that all messages are flushed to the
// underlying sink before returning.
func (b *Broadcaster) Close() error {
log.Infof("broadcaster: closing")
select {
case <-b.doneCh:
// already closed
return fmt.Errorf("broadcaster: already closed")
default:
close(b.doneCh)
}
b.wg.Wait()
errs := new(multierror.Error)
for _, sink := range b.sinks {
if err := sink.Close(); err != nil {
errs = multierror.Append(errs, err)
log.WithError(err).Errorf("broadcaster: error closing sink %v", sink)
}
}
// NOTE(prozlach): not stricly necessary, just a basic hygiene
close(b.eventsCh)
log.Debugf("broadcaster: closed")
return errs.ErrorOrNil()
}
// run is the main broadcast loop, started when the broadcaster is created.
// Under normal conditions, it waits for events on the event channel. After
// Close is called, this goroutine will exit.
func (b *Broadcaster) run() {
defer b.wg.Done()
loop:
for {
select {
case event := <-b.eventsCh:
sinksCount := len(b.sinks)
// NOTE(prozlach): we would only have a sink in the broadcaster if
// there are any endpoints configured. Ideally the broadcaster
// should not exist if there are no endpoints configured, but this
// would require a bigger refactoring.
if sinksCount == 0 {
log.Debugf("broadcaster: there are no sinks configured, dropping event %v", event)
continue loop
}
// NOTE(prozlach): The approach here is a compromise between the
// existing behavior of Broadcaster (__attempt__ to reliably
// deliver to all dependant sinks) and making Broadcaster
// interruptable so that graceful shutdown of container registry
// is possible.
// The idea is to do Write() calls in goroutine (they are blocking)
// and if the termination signal is received, wait up to
// fanouttimeout and then terminate `run()` goroutine, which in
// turn unblocks `Close()` call to close all sinks which terminates
// the gouroutines which do `Write()` calls as an efect..
finishedCount := 0
finishedCh := make(chan struct{}, sinksCount)
for i := 0; i < sinksCount; i++ {
go func(i int) {
if err := b.sinks[i].Write(event); err != nil {
log.WithError(err).
Errorf("broadcaster: error writing events to %v, these events will be lost", b.sinks[i])
}
finishedCh <- struct{}{}
}(i)
}
inner:
for {
// nolint: revive // max-control-nesting
select {
case <-b.doneCh:
timer := time.NewTimer(b.fanoutTimeout)
log.WithField("sinks_remaining", sinksCount-finishedCount).
Warnf("broadcaster: received termination signal")
select {
case <-timer.C:
log.WithField("sinks_remaining", sinksCount-finishedCount).
Warnf("broadcaster: queue purge timeout reached, sink broadcasts dropped")
return
case <-finishedCh:
finishedCount += 1
if finishedCount == sinksCount {
// All notifications were sent before the timeout
// was reached. We are done here.
return
}
}
case <-finishedCh:
finishedCount += 1
if finishedCount == sinksCount {
// All done!
break inner
}
}
}
case <-b.doneCh:
return
}
}
}
// eventQueue accepts all messages into a queue for asynchronous consumption
// by a sink. It is unbounded and thread safe but the sink must be reliable or
// events will be dropped.
type eventQueue struct {
sink Sink
listeners []eventQueueListener
doneCh chan struct{}
bufferInCh chan *Event
bufferOutCh chan *Event
queuePurgeTimeout time.Duration
wgBufferer *sync.WaitGroup
wgSender *sync.WaitGroup
}
// eventQueueListener is called when various events happen on the queue.
type eventQueueListener interface {
ingress(events *Event)
egress(events *Event)
}
// newEventQueue returns a queue to the provided sink. If the updater is non-
// nil, it will be called to update pending metrics on ingress and egress.
func newEventQueue(sink Sink, queuePurgeTimeout time.Duration, listeners ...eventQueueListener) *eventQueue {
eq := eventQueue{
sink: sink,
listeners: listeners,
doneCh: make(chan struct{}),
bufferInCh: make(chan *Event),
bufferOutCh: make(chan *Event),
queuePurgeTimeout: queuePurgeTimeout,
wgBufferer: new(sync.WaitGroup),
wgSender: new(sync.WaitGroup),
}
eq.wgSender.Add(1)
eq.wgBufferer.Add(1)
go eq.sender()
go eq.bufferer()
return &eq
}
// Write accepts an event into the queue, only failing if the queue has
// been closed.
func (eq *eventQueue) Write(event *Event) error {
// NOTE(prozlach): avoid a racy situation when both channels are "ready",
// and make sure that closing the Sink takes priority:
select {
case <-eq.doneCh:
return ErrSinkClosed
default:
select {
case eq.bufferInCh <- event:
case <-eq.doneCh:
return ErrSinkClosed
}
}
return nil
}
func (eq *eventQueue) bufferer() {
defer eq.wgBufferer.Done()
defer log.Debugf("eventQueue bufferer: closed")
events := list.New()
// Main loop is executed during normal operation. Depending on whether there
// are any events in the buffer or not, we include in select wait on write
// to the sender goroutine or not respectively.
main:
for {
if events.Len() < 1 {
// List is empty, wait for an event
select {
case event := <-eq.bufferInCh:
for _, listener := range eq.listeners {
listener.ingress(event)
}
events.PushBack(event)
case <-eq.doneCh:
break main
}
} else {
front := events.Front()
select {
case event := <-eq.bufferInCh:
for _, listener := range eq.listeners {
listener.ingress(event)
}
events.PushBack(event)
case eq.bufferOutCh <- front.Value.(*Event):
events.Remove(front)
case <-eq.doneCh:
break main
default:
}
}
}
timer := time.NewTimer(eq.queuePurgeTimeout)
log.WithField("remaining_events", events.Len()).
Warnf("eventqueue: received termination signal")
// This loop is executed only during the termination phase. It's purpose is to
// try to send all unsend notifications in the given time window.
loop:
for events.Len() > 0 {
front := events.Front()
select {
case eq.bufferOutCh <- front.Value.(*Event):
events.Remove(front)
case <-timer.C:
break loop
default:
}
}
// NOTE(prozlach): We are done, tell sender to wrap it up too.
close(eq.bufferOutCh)
// NOTE(prozlach): queue is terminating, if there are still events in the
// buffer, let the operator know they were lost:
for events.Len() > 0 {
front := events.Front()
event := front.Value.(*Event)
log.Warnf("eventqueue: event lost: %v", event)
events.Remove(front)
}
}
func (eq *eventQueue) sender() {
defer eq.wgSender.Done()
defer log.Debugf("eventQueue sender: closed")
for {
event, isOpen := <-eq.bufferOutCh
if !isOpen {
break
}
if err := eq.sink.Write(event); err != nil {
log.WithError(err).
WithField("event", event).
Warnf("eventqueue: event lost")
}
for _, listener := range eq.listeners {
listener.egress(event)
}
}
}
// Close shuts down the event queue, flushing
func (eq *eventQueue) Close() error {
log.Infof("event queue: closing")
select {
case <-eq.doneCh:
// already closed
return fmt.Errorf("eventqueue: already closed")
default:
close(eq.doneCh)
}
// NOTE(prozlach): the order of things is very important here as we need to
// first make sure that no new events will be accepted by this sink and
// then cancel the underlying sink so that we can unblock this sink so that
// it notices that the termination signal came.
eq.wgBufferer.Wait()
err := eq.sink.Close()
eq.wgSender.Wait()
// NOTE(prozlach): not stricly necessary, just a basic hygiene
// NOTE(prozalch): we MUST not close eq.bufferInCh channel before waiting
// gouroutines had a chance to err out or send event, otherwise we will
// cause panics
close(eq.bufferInCh)
return err
}
// ignoredSink discards events with ignored target media types and actions.
// passes the rest along.
type ignoredSink struct {
Sink
ignoreMediaTypes map[string]bool
ignoreActions map[string]bool
}
func newIgnoredSink(sink Sink, ignored, ignoreActions []string) Sink {
if len(ignored) == 0 {
return sink
}
ignoredMap := make(map[string]bool)
for _, mediaType := range ignored {
ignoredMap[mediaType] = true
}
ignoredActionsMap := make(map[string]bool)
for _, action := range ignoreActions {
ignoredActionsMap[action] = true
}
return &ignoredSink{
Sink: sink,
ignoreMediaTypes: ignoredMap,
ignoreActions: ignoredActionsMap,
}
}
// Write discards an event with ignored target media types or passes the event
// along.
func (imts *ignoredSink) Write(event *Event) error {
if event == nil {
return nil
}
if imts.ignoreMediaTypes[event.Target.MediaType] {
return nil
}
if imts.ignoreActions[event.Action] {
return nil
}
return imts.Sink.Write(event)
}
// retryingSink retries the write until success or an ErrSinkClosed is
// returned. Underlying sink must have p > 0 of succeeding or the sink will
// block. Internally, it is a circuit breaker retries to manage reset.
// Concurrent calls to a retrying sink are serialized through the sink,
// meaning that if one is in-flight, another will not proceed.
type retryingSink struct {
sink Sink
doneCh chan struct{}
eventsCh chan *Event
errCh chan error
wg *sync.WaitGroup
// circuit breaker heuristics
failures struct {
threshold int
backoff time.Duration // time after which we retry after failure.
}
}
// newRetryingSink returns a sink that will retry writes to a sink, backing
// off on failure. Parameters threshold and backoff adjust the behavior of the
// circuit breaker.
func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
rs := &retryingSink{
sink: sink,
doneCh: make(chan struct{}),
eventsCh: make(chan *Event),
errCh: make(chan error),
wg: new(sync.WaitGroup),
}
rs.failures.threshold = threshold
rs.failures.backoff = backoff
rs.wg.Add(1)
go rs.run()
return rs
}
func (rs *retryingSink) run() {
defer rs.wg.Done()
main:
for {
// nolint: revive // max-control-nesting
select {
case <-rs.doneCh:
return
case event := <-rs.eventsCh:
for failuresCount := 0; failuresCount < rs.failures.threshold; failuresCount++ {
select {
case <-rs.doneCh:
rs.errCh <- ErrSinkClosed
return
default:
}
err := rs.sink.Write(event)
// Event sent successfully, fetch next event from channel:
if err == nil {
rs.errCh <- nil
continue main
}
// Underlying sink is closed, let's wrap up:
if errors.Is(err, ErrSinkClosed) {
rs.errCh <- ErrSinkClosed
return
}
log.WithError(err).
WithField("railure_count", failuresCount).
Error("retryingsink: error writing event, retrying")
}
log.WithField("sink", rs.sink).
Warnf("encountered too many errors when writing to sink, enabling backoff")
for {
// NOTE(prozlach): We can't use Ticker here as the write()
// operation may take longer than the backoff period and this
// would result in triggering new write imediatelly after the
// previous one.
timer := time.NewTimer(rs.failures.backoff)
select {
case <-rs.doneCh:
rs.errCh <- ErrSinkClosed
timer.Stop()
return
case lastFailureTime := <-timer.C:
err := rs.sink.Write(event)
// Event sent successfully, fetch next event from channel:
if err == nil {
rs.errCh <- nil
continue main
}
// Underlying sink is closed, let's wrap up:
if errors.Is(err, ErrSinkClosed) {
rs.errCh <- ErrSinkClosed
return
}
log.WithError(err).
WithField("next_retry_time", lastFailureTime.Add(rs.failures.backoff).String()).
Error("retryingsink: error writing event, backing off")
}
}
}
}
}
// Write attempts to flush the event to the downstream sink until it succeeds
// or the sink is closed.
func (rs *retryingSink) Write(event *Event) error {
// NOTE(prozlach): avoid a racy situation when both channels are "ready",
// and make sure that closing the Sink takes priority:
select {
case <-rs.doneCh:
return ErrSinkClosed
default:
select {
case rs.eventsCh <- event:
return <-rs.errCh
case <-rs.doneCh:
return ErrSinkClosed
}
}
}
// Close closes the sink and the underlying sink.
func (rs *retryingSink) Close() error {
log.Infof("retryingSink: closing")
select {
case <-rs.doneCh:
return fmt.Errorf("retryingSink: already closed")
default:
close(rs.doneCh)
}
rs.wg.Wait()
err := rs.sink.Close()
// NOTE(prozlach): not stricly necessary, just a basic hygiene
close(rs.eventsCh)
close(rs.errCh)
log.Debugf("retryingSink: closed")
return err
}
// backoffSink attempts to write an event to the given sink.
// It will retry up to a number of maxretries as defined in the configuration
// and will drop the event after it reaches the number of retries.
type backoffSink struct {
doneCh chan struct{}
sink Sink
backoff backoff.BackOff
}
func newBackoffSink(sink Sink, initialInterval time.Duration, maxRetries int) *backoffSink {
b := backoff.NewExponentialBackOff()
b.InitialInterval = initialInterval
return &backoffSink{
doneCh: make(chan struct{}),
sink: sink,
// nolint: gosec
backoff: backoff.WithMaxRetries(b, uint64(maxRetries)),
}
}
// Write attempts to flush the event to the downstream sink using an
// exponential backoff strategy. If the max number of retries is
// reached, an error is returned and the event is dropped.
// It returns early if the sink is closed.
func (bs *backoffSink) Write(event *Event) error {
op := func() error {
select {
case <-bs.doneCh:
return backoff.Permanent(ErrSinkClosed)
default:
}
if err := bs.sink.Write(event); err != nil {
log.WithError(err).Error("backoffSink: error writing event")
return err
}
return nil
}
return backoff.Retry(op, bs.backoff)
}
// Close closes the sink and the underlying sink.
func (bs *backoffSink) Close() error {
log.Infof("backoffSink: closing")
select {
case <-bs.doneCh:
// already closed
return fmt.Errorf("backoffSink: already closed")
default:
// NOTE(prozlach): not stricly necessary, just a basic hygiene
close(bs.doneCh)
}
return bs.sink.Close()
}