plugins/outputs/cloudwatchlogs/internal/pusher/queue.go (190 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package pusher import ( "sync" "sync/atomic" "time" "github.com/influxdata/telegraf" "github.com/aws/amazon-cloudwatch-agent/logs" "github.com/aws/amazon-cloudwatch-agent/profiler" ) type Queue interface { AddEvent(e logs.LogEvent) AddEventNonBlocking(e logs.LogEvent) } type queue struct { target Target logger telegraf.Logger entityProvider logs.LogEntityProvider sender Sender converter *converter batch *logEventBatch eventsCh chan logs.LogEvent nonBlockingEventsCh chan logs.LogEvent flushCh chan struct{} resetTimerCh chan struct{} flushTimer *time.Timer flushTimeout atomic.Value stop <-chan struct{} lastSentTime atomic.Value initNonBlockingChOnce sync.Once startNonBlockCh chan struct{} wg *sync.WaitGroup } func newQueue( logger telegraf.Logger, target Target, flushTimeout time.Duration, entityProvider logs.LogEntityProvider, sender Sender, stop <-chan struct{}, wg *sync.WaitGroup, ) Queue { q := &queue{ target: target, logger: logger, converter: newConverter(logger, target), batch: newLogEventBatch(target, entityProvider), sender: sender, eventsCh: make(chan logs.LogEvent, 100), flushCh: make(chan struct{}), resetTimerCh: make(chan struct{}), flushTimer: time.NewTimer(flushTimeout), stop: stop, startNonBlockCh: make(chan struct{}), wg: wg, } q.flushTimeout.Store(flushTimeout) q.wg.Add(1) go q.start() return q } // AddEvent adds an event to the queue blocking if full. func (q *queue) AddEvent(e logs.LogEvent) { if !hasValidTime(e) { q.logger.Errorf("The log entry in (%v/%v) with timestamp (%v) comparing to the current time (%v) is out of accepted time range. Discard the log entry.", q.target.Group, q.target.Stream, e.Time(), time.Now()) return } q.eventsCh <- e } // AddEventNonBlocking adds an event to the queue without blocking. If the queue is full, drops the oldest event in // the queue. func (q *queue) AddEventNonBlocking(e logs.LogEvent) { if !hasValidTime(e) { q.logger.Errorf("The log entry in (%v/%v) with timestamp (%v) comparing to the current time (%v) is out of accepted time range. Discard the log entry.", q.target.Group, q.target.Stream, e.Time(), time.Now()) return } q.initNonBlockingChOnce.Do(func() { q.nonBlockingEventsCh = make(chan logs.LogEvent, reqEventsLimit*2) q.startNonBlockCh <- struct{}{} // Unblock the select loop to recognize the channel merge }) // Drain the channel until new event can be added for { select { case q.nonBlockingEventsCh <- e: return default: <-q.nonBlockingEventsCh q.addStats("emfMetricDrop", 1) } } } // start is the main loop for processing events and managing the queue. func (q *queue) start() { defer q.wg.Done() mergeChan := make(chan logs.LogEvent) // Merge events from both blocking and non-blocking channel go func() { var nonBlockingEventsCh <-chan logs.LogEvent for { select { case e := <-q.eventsCh: mergeChan <- e case e := <-nonBlockingEventsCh: mergeChan <- e case <-q.startNonBlockCh: nonBlockingEventsCh = q.nonBlockingEventsCh case <-q.stop: return } } }() go q.manageFlushTimer() for { select { case e := <-mergeChan: // Start timer when first event of the batch is added (happens after a flush timer timeout) if len(q.batch.events) == 0 { q.resetFlushTimer() } event := q.converter.convert(e) if !q.batch.inTimeRange(event.timestamp) || !q.batch.hasSpace(event.eventBytes) { q.send() } q.batch.append(event) case <-q.flushCh: lastSentTime, _ := q.lastSentTime.Load().(time.Time) flushTimeout, _ := q.flushTimeout.Load().(time.Duration) if time.Since(lastSentTime) >= flushTimeout && len(q.batch.events) > 0 { q.send() } else { q.resetFlushTimer() } case <-q.stop: if len(q.batch.events) > 0 { q.send() } return } } } // send the current batch of events. func (q *queue) send() { if len(q.batch.events) > 0 { q.batch.addDoneCallback(q.onSuccessCallback(q.batch.bufferedSize)) q.sender.Send(q.batch) q.batch = newLogEventBatch(q.target, q.entityProvider) } } // onSuccessCallback returns a callback function to be executed after a successful send. func (q *queue) onSuccessCallback(bufferedSize int) func() { return func() { q.lastSentTime.Store(time.Now()) go q.addStats("rawSize", float64(bufferedSize)) q.resetFlushTimer() } } // addStats adds statistics to the profiler. func (q *queue) addStats(statsName string, value float64) { statsKey := []string{"cloudwatchlogs", q.target.Group, statsName} profiler.Profiler.AddStats(statsKey, value) } // manageFlushTimer manages the flush timer for the queue. Needed since the timer Stop/Reset functions cannot // be called concurrently. func (q *queue) manageFlushTimer() { for { select { case <-q.flushTimer.C: q.flushCh <- struct{}{} case <-q.resetTimerCh: q.stopFlushTimer() if flushTimeout, ok := q.flushTimeout.Load().(time.Duration); ok { q.flushTimer.Reset(flushTimeout) } case <-q.stop: q.stopFlushTimer() return } } } // stopFlushTimer stops the timer and attempts to drain it. func (q *queue) stopFlushTimer() { if !q.flushTimer.Stop() { select { case <-q.flushTimer.C: default: } } } // resetFlushTimer sends a reset timer request if there isn't already one pending. func (q *queue) resetFlushTimer() { select { case q.resetTimerCh <- struct{}{}: default: } } func hasValidTime(e logs.LogEvent) bool { //http://docs.aws.amazon.com/goto/SdkForGoV1/logs-2014-03-28/PutLogEvents //* None of the log events in the logEventBatch can be more than 2 hours in the future. //* None of the log events in the logEventBatch can be older than 14 days or the retention period of the log group. if !e.Time().IsZero() { now := time.Now() dt := now.Sub(e.Time()).Hours() if dt > 24*14 || dt < -2 { return false } } return true }