in kinesis/kinesis.go [348:386]
func (outputPlugin *OutputPlugin) FlushWithRetries(count int, records []*kinesis.PutRecordsRequestEntry) {
var retCode, tries int
currentRetries := outputPlugin.getConcurrentRetries()
outputPlugin.addGoroutineCount(1)
for tries = 0; tries < outputPlugin.concurrencyRetryLimit; tries++ {
if currentRetries > 0 {
// Wait if other goroutines are retrying, as well as implement a progressive backoff
if currentRetries > uint32(outputPlugin.concurrencyRetryLimit) {
time.Sleep(time.Duration((1<<uint32(outputPlugin.concurrencyRetryLimit))*100) * time.Millisecond)
} else {
time.Sleep(time.Duration((1<<currentRetries)*100) * time.Millisecond)
}
}
logrus.Debugf("[kinesis %d] Sending (%d) records, currentRetries=(%d)", outputPlugin.PluginID, len(records), currentRetries)
retCode = outputPlugin.Flush(&records)
if retCode != output.FLB_RETRY {
break
}
currentRetries = outputPlugin.addConcurrentRetries(1)
logrus.Infof("[kinesis %d] Going to retry with (%d) records, currentRetries=(%d)", outputPlugin.PluginID, len(records), currentRetries)
}
outputPlugin.addGoroutineCount(-1)
if tries > 0 {
outputPlugin.addConcurrentRetries(-tries)
}
switch retCode {
case output.FLB_ERROR:
logrus.Errorf("[kinesis %d] Failed to send (%d) records with error", outputPlugin.PluginID, len(records))
case output.FLB_RETRY:
logrus.Errorf("[kinesis %d] Failed to send (%d) records after retries %d", outputPlugin.PluginID, len(records), outputPlugin.concurrencyRetryLimit)
case output.FLB_OK:
logrus.Debugf("[kinesis %d] Flushed %d records\n", outputPlugin.PluginID, count)
}
}