in plugins/outputs/cloudwatchlogs/internal/pusher/queue.go [109:159]
func (q *queue) start() {
defer q.wg.Done()
mergeChan := make(chan logs.LogEvent)
// Merge events from both blocking and non-blocking channel
go func() {
var nonBlockingEventsCh <-chan logs.LogEvent
for {
select {
case e := <-q.eventsCh:
mergeChan <- e
case e := <-nonBlockingEventsCh:
mergeChan <- e
case <-q.startNonBlockCh:
nonBlockingEventsCh = q.nonBlockingEventsCh
case <-q.stop:
return
}
}
}()
go q.manageFlushTimer()
for {
select {
case e := <-mergeChan:
// Start timer when first event of the batch is added (happens after a flush timer timeout)
if len(q.batch.events) == 0 {
q.resetFlushTimer()
}
event := q.converter.convert(e)
if !q.batch.inTimeRange(event.timestamp) || !q.batch.hasSpace(event.eventBytes) {
q.send()
}
q.batch.append(event)
case <-q.flushCh:
lastSentTime, _ := q.lastSentTime.Load().(time.Time)
flushTimeout, _ := q.flushTimeout.Load().(time.Duration)
if time.Since(lastSentTime) >= flushTimeout && len(q.batch.events) > 0 {
q.send()
} else {
q.resetFlushTimer()
}
case <-q.stop:
if len(q.batch.events) > 0 {
q.send()
}
return
}
}
}