func()

in plugins/outputs/cloudwatchlogs/internal/pusher/queue.go [109:159]


func (q *queue) start() {
	defer q.wg.Done()
	mergeChan := make(chan logs.LogEvent)

	// Merge events from both blocking and non-blocking channel
	go func() {
		var nonBlockingEventsCh <-chan logs.LogEvent
		for {
			select {
			case e := <-q.eventsCh:
				mergeChan <- e
			case e := <-nonBlockingEventsCh:
				mergeChan <- e
			case <-q.startNonBlockCh:
				nonBlockingEventsCh = q.nonBlockingEventsCh
			case <-q.stop:
				return
			}
		}
	}()

	go q.manageFlushTimer()

	for {
		select {
		case e := <-mergeChan:
			// Start timer when first event of the batch is added (happens after a flush timer timeout)
			if len(q.batch.events) == 0 {
				q.resetFlushTimer()
			}
			event := q.converter.convert(e)
			if !q.batch.inTimeRange(event.timestamp) || !q.batch.hasSpace(event.eventBytes) {
				q.send()
			}
			q.batch.append(event)
		case <-q.flushCh:
			lastSentTime, _ := q.lastSentTime.Load().(time.Time)
			flushTimeout, _ := q.flushTimeout.Load().(time.Duration)
			if time.Since(lastSentTime) >= flushTimeout && len(q.batch.events) > 0 {
				q.send()
			} else {
				q.resetFlushTimer()
			}
		case <-q.stop:
			if len(q.batch.events) > 0 {
				q.send()
			}
			return
		}
	}
}