in plugins/outputs/cloudwatchlogs/pusher.go [127:193]
func (p *pusher) start() {
ec := make(chan logs.LogEvent)
// Merge events from both blocking and non-blocking channel
go func() {
for {
select {
case e := <-p.eventsCh:
ec <- e
case e := <-p.nonBlockingEventsCh:
ec <- e
case <-p.startNonBlockCh:
case <-p.stop:
return
}
}
}()
for {
select {
case e := <-ec:
// Start timer when first event of the batch is added (happens after a flush timer timeout)
if len(p.events) == 0 {
p.resetFlushTimer()
}
ce := p.convertEvent(e)
et := time.Unix(*ce.Timestamp/1000, *ce.Timestamp%1000) // Cloudwatch Log Timestamp is in Millisecond
// A batch of log events in a single request cannot span more than 24 hours.
if (p.minT != nil && et.Sub(*p.minT) > 24*time.Hour) || (p.maxT != nil && p.maxT.Sub(et) > 24*time.Hour) {
p.send()
}
size := len(*ce.Message) + eventHeaderSize
if p.bufferredSize+size > reqSizeLimit || len(p.events) == reqEventsLimit {
p.send()
}
if len(p.events) > 0 && *ce.Timestamp < *p.events[len(p.events)-1].Timestamp {
p.needSort = true
}
p.events = append(p.events, ce)
p.doneCallbacks = append(p.doneCallbacks, e.Done)
p.bufferredSize += size
if p.minT == nil || p.minT.After(et) {
p.minT = &et
}
if p.maxT == nil || p.maxT.Before(et) {
p.maxT = &et
}
case <-p.flushTimer.C:
if time.Since(p.lastSentTime) >= p.FlushTimeout && len(p.events) > 0 {
p.send()
} else {
p.resetFlushTimer()
}
case <-p.stop:
if len(p.events) > 0 {
p.send()
}
return
}
}
}