func()

in plugins/outputs/cloudwatchlogs/internal/pusher/sender.go [58:131]


func (s *sender) Send(batch *logEventBatch) {
	if len(batch.events) == 0 {
		return
	}
	input := batch.build()
	startTime := time.Now()

	retryCountShort := 0
	retryCountLong := 0
	for {
		output, err := s.service.PutLogEvents(input)
		if err == nil {
			if output.RejectedLogEventsInfo != nil {
				info := output.RejectedLogEventsInfo
				if info.TooOldLogEventEndIndex != nil {
					s.logger.Warnf("%d log events for log '%s/%s' are too old", *info.TooOldLogEventEndIndex, batch.Group, batch.Stream)
				}
				if info.TooNewLogEventStartIndex != nil {
					s.logger.Warnf("%d log events for log '%s/%s' are too new", *info.TooNewLogEventStartIndex, batch.Group, batch.Stream)
				}
				if info.ExpiredLogEventEndIndex != nil {
					s.logger.Warnf("%d log events for log '%s/%s' are expired", *info.ExpiredLogEventEndIndex, batch.Group, batch.Stream)
				}
			}
			batch.done()
			s.logger.Debugf("Pusher published %v log events to group: %v stream: %v with size %v KB in %v.", len(batch.events), batch.Group, batch.Stream, batch.bufferedSize/1024, time.Since(startTime))
			return
		}

		var awsErr awserr.Error
		if !errors.As(err, &awsErr) {
			s.logger.Errorf("Non aws error received when sending logs to %v/%v: %v. CloudWatch agent will not retry and logs will be missing!", batch.Group, batch.Stream, err)
			return
		}

		switch e := awsErr.(type) {
		case *cloudwatchlogs.ResourceNotFoundException:
			if targetErr := s.targetManager.InitTarget(batch.Target); targetErr != nil {
				s.logger.Errorf("Unable to create log stream %v/%v: %v", batch.Group, batch.Stream, targetErr)
				break
			}
		case *cloudwatchlogs.InvalidParameterException,
			*cloudwatchlogs.DataAlreadyAcceptedException:
			s.logger.Errorf("%v, will not retry the request", e)
			return
		default:
			s.logger.Errorf("Aws error received when sending logs to %v/%v: %v", batch.Group, batch.Stream, awsErr)
		}

		// retry wait strategy depends on the type of error returned
		var wait time.Duration
		if chooseRetryWaitStrategy(err) == retryLong {
			wait = retryWaitLong(retryCountLong)
			retryCountLong++
		} else {
			wait = retryWaitShort(retryCountShort)
			retryCountShort++
		}

		if time.Since(startTime)+wait > s.RetryDuration() {
			s.logger.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream)
			return
		}

		s.logger.Warnf("Retried %v time, going to sleep %v before retrying.", retryCountShort+retryCountLong-1, wait)

		select {
		case <-s.stop:
			s.logger.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream)
			return
		case <-time.After(wait):
		}
	}
}