func()

in kinesis/kinesis.go [348:386]


func (outputPlugin *OutputPlugin) FlushWithRetries(count int, records []*kinesis.PutRecordsRequestEntry) {
	var retCode, tries int

	currentRetries := outputPlugin.getConcurrentRetries()
	outputPlugin.addGoroutineCount(1)

	for tries = 0; tries < outputPlugin.concurrencyRetryLimit; tries++ {
		if currentRetries > 0 {
			// Wait if other goroutines are retrying, as well as implement a progressive backoff
			if currentRetries > uint32(outputPlugin.concurrencyRetryLimit) {
				time.Sleep(time.Duration((1<<uint32(outputPlugin.concurrencyRetryLimit))*100) * time.Millisecond)
			} else {
				time.Sleep(time.Duration((1<<currentRetries)*100) * time.Millisecond)
			}
		}

		logrus.Debugf("[kinesis %d] Sending (%d) records, currentRetries=(%d)", outputPlugin.PluginID, len(records), currentRetries)
		retCode = outputPlugin.Flush(&records)
		if retCode != output.FLB_RETRY {
			break
		}
		currentRetries = outputPlugin.addConcurrentRetries(1)
		logrus.Infof("[kinesis %d] Going to retry with (%d) records, currentRetries=(%d)", outputPlugin.PluginID, len(records), currentRetries)
	}

	outputPlugin.addGoroutineCount(-1)
	if tries > 0 {
		outputPlugin.addConcurrentRetries(-tries)
	}

	switch retCode {
	case output.FLB_ERROR:
		logrus.Errorf("[kinesis %d] Failed to send (%d) records with error", outputPlugin.PluginID, len(records))
	case output.FLB_RETRY:
		logrus.Errorf("[kinesis %d] Failed to send (%d) records after retries %d", outputPlugin.PluginID, len(records), outputPlugin.concurrencyRetryLimit)
	case output.FLB_OK:
		logrus.Debugf("[kinesis %d] Flushed %d records\n", outputPlugin.PluginID, count)
	}
}