func()

in internal/flavors/publisher.go [55:91]


func (p *Publisher) HandleEvents(ctx context.Context, ch <-chan []beat.Event) {
	var eventsToSend []beat.Event
	flushTicker := time.NewTicker(p.interval)
	for {
		select {
		case <-ctx.Done():
			p.log.Warnf("Publisher context is done: %v", ctx.Err())
			p.publish(&eventsToSend)
			return

		// Flush events to ES after a pre-defined interval, meant to clean residuals after a cycle is finished.
		case <-flushTicker.C:
			if len(eventsToSend) == 0 {
				continue
			}

			p.log.Infof("Publisher time interval reached")
			p.publish(&eventsToSend)

		// Flush events to ES when reaching a certain threshold
		case event, ok := <-ch:
			if !ok {
				p.log.Warn("Publisher channel is closed")
				p.publish(&eventsToSend)
				return
			}

			eventsToSend = append(eventsToSend, event...)
			if len(eventsToSend) < p.threshold {
				continue
			}

			p.log.Infof("Publisher buffer threshold:%d reached", p.threshold)
			p.publish(&eventsToSend)
		}
	}
}