in internal/flavors/publisher.go [55:91]
func (p *Publisher) HandleEvents(ctx context.Context, ch <-chan []beat.Event) {
var eventsToSend []beat.Event
flushTicker := time.NewTicker(p.interval)
for {
select {
case <-ctx.Done():
p.log.Warnf("Publisher context is done: %v", ctx.Err())
p.publish(&eventsToSend)
return
// Flush events to ES after a pre-defined interval, meant to clean residuals after a cycle is finished.
case <-flushTicker.C:
if len(eventsToSend) == 0 {
continue
}
p.log.Infof("Publisher time interval reached")
p.publish(&eventsToSend)
// Flush events to ES when reaching a certain threshold
case event, ok := <-ch:
if !ok {
p.log.Warn("Publisher channel is closed")
p.publish(&eventsToSend)
return
}
eventsToSend = append(eventsToSend, event...)
if len(eventsToSend) < p.threshold {
continue
}
p.log.Infof("Publisher buffer threshold:%d reached", p.threshold)
p.publish(&eventsToSend)
}
}
}