func()

in plugins/outputs/cloudwatchlogs/pusher.go [210:306]


func (p *pusher) send() {
	defer p.resetFlushTimer() // Reset the flush timer after sending the request
	if p.needSort {
		sort.Stable(ByTimestamp(p.events))
	}

	input := &cloudwatchlogs.PutLogEventsInput{
		LogEvents:     p.events,
		LogGroupName:  &p.Group,
		LogStreamName: &p.Stream,
		SequenceToken: p.sequenceToken,
	}

	startTime := time.Now()

	retryCount := 0
	for {
		input.SequenceToken = p.sequenceToken
		output, err := p.Service.PutLogEvents(input)
		if err == nil {
			if output.NextSequenceToken != nil {
				p.sequenceToken = output.NextSequenceToken
			}
			if output.RejectedLogEventsInfo != nil {
				info := output.RejectedLogEventsInfo
				if info.TooOldLogEventEndIndex != nil {
					p.Log.Warnf("%d log events for log '%s/%s' are too old", *info.TooOldLogEventEndIndex, p.Group, p.Stream)
				}
				if info.TooNewLogEventStartIndex != nil {
					p.Log.Warnf("%d log events for log '%s/%s' are too new", *info.TooNewLogEventStartIndex, p.Group, p.Stream)
				}
				if info.ExpiredLogEventEndIndex != nil {
					p.Log.Warnf("%d log events for log '%s/%s' are expired", *info.ExpiredLogEventEndIndex, p.Group, p.Stream)
				}
			}
			for i := len(p.doneCallbacks) - 1; i >= 0; i-- {
				done := p.doneCallbacks[i]
				done()
			}

			p.Log.Debugf("Pusher published %v log events to group: %v stream: %v with size %v KB in %v.", len(p.events), p.Group, p.Stream, p.bufferredSize/1024, time.Since(startTime))
			p.addStats("rawSize", float64(p.bufferredSize))

			p.reset()
			p.lastSentTime = time.Now()

			return
		}

		awsErr, ok := err.(awserr.Error)
		if !ok {
			p.Log.Errorf("Non aws error received when sending logs to %v/%v: %v. CloudWatch agent will not retry and logs will be missing!", p.Group, p.Stream, err)
			// Messages will be discarded but done callbacks not called
			p.reset()
			return
		}

		switch e := awsErr.(type) {
		case *cloudwatchlogs.ResourceNotFoundException:
			err := p.createLogGroupAndStream()
			if err != nil {
				p.Log.Errorf("Unable to create log stream %v/%v: %v", p.Group, p.Stream, e.Message())
				break
			}
			p.putRetentionPolicy()
		case *cloudwatchlogs.InvalidSequenceTokenException:
			if p.sequenceToken == nil {
				p.Log.Infof("First time sending logs to %v/%v since startup so sequenceToken is nil, learned new token:(%v): %v",  p.Group, p.Stream, e.ExpectedSequenceToken, e.Message())
			} else {
				p.Log.Warnf("Invalid SequenceToken used (%v) while sending logs to %v/%v, will use new token and retry: %v", p.sequenceToken, p.Group, p.Stream, e.Message())
			}
			if e.ExpectedSequenceToken == nil {
				p.Log.Errorf("Failed to find sequence token from aws response while sending logs to %v/%v: %v", p.Group, p.Stream, e.Message())
			}
			p.sequenceToken = e.ExpectedSequenceToken
		case *cloudwatchlogs.InvalidParameterException,
			*cloudwatchlogs.DataAlreadyAcceptedException:
			p.Log.Errorf("%v, will not retry the request", e)
			p.reset()
			return
		default:
			p.Log.Errorf("Aws error received when sending logs to %v/%v: %v", p.Group, p.Stream, awsErr)
		}

		wait := retryWait(retryCount)
		if time.Since(startTime)+wait > p.RetryDuration {
			p.Log.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCount, p.Group, p.Stream)
			p.reset()
			return
		}

		p.Log.Warnf("Retried %v time, going to sleep %v before retrying.", retryCount, wait)
		time.Sleep(wait)
		retryCount++
	}

}