func()

in plugins/outputs/cloudwatchlogs/pusher.go [127:193]


func (p *pusher) start() {
	ec := make(chan logs.LogEvent)

	// Merge events from both blocking and non-blocking channel
	go func() {
		for {
			select {
			case e := <-p.eventsCh:
				ec <- e
			case e := <-p.nonBlockingEventsCh:
				ec <- e
			case <-p.startNonBlockCh:
			case <-p.stop:
				return
			}
		}
	}()

	for {
		select {
		case e := <-ec:
			// Start timer when first event of the batch is added (happens after a flush timer timeout)
			if len(p.events) == 0 {
				p.resetFlushTimer()
			}

			ce := p.convertEvent(e)
			et := time.Unix(*ce.Timestamp/1000, *ce.Timestamp%1000) // Cloudwatch Log Timestamp is in Millisecond

			// A batch of log events in a single request cannot span more than 24 hours.
			if (p.minT != nil && et.Sub(*p.minT) > 24*time.Hour) || (p.maxT != nil && p.maxT.Sub(et) > 24*time.Hour) {
				p.send()
			}

			size := len(*ce.Message) + eventHeaderSize
			if p.bufferredSize+size > reqSizeLimit || len(p.events) == reqEventsLimit {
				p.send()
			}

			if len(p.events) > 0 && *ce.Timestamp < *p.events[len(p.events)-1].Timestamp {
				p.needSort = true
			}

			p.events = append(p.events, ce)
			p.doneCallbacks = append(p.doneCallbacks, e.Done)
			p.bufferredSize += size
			if p.minT == nil || p.minT.After(et) {
				p.minT = &et
			}
			if p.maxT == nil || p.maxT.Before(et) {
				p.maxT = &et
			}

		case <-p.flushTimer.C:
			if time.Since(p.lastSentTime) >= p.FlushTimeout && len(p.events) > 0 {
				p.send()
			} else {
				p.resetFlushTimer()
			}
		case <-p.stop:
			if len(p.events) > 0 {
				p.send()
			}
			return
		}
	}
}