in kinesis/kinesis.go [391:409]
func (outputPlugin *OutputPlugin) FlushConcurrent(count int, records []*kinesis.PutRecordsRequestEntry) int {
runningGoRoutines := outputPlugin.getGoroutineCount()
if runningGoRoutines+1 > int32(outputPlugin.Concurrency) {
logrus.Infof("[kinesis %d] flush returning retry, concurrency limit reached (%d)\n", outputPlugin.PluginID, runningGoRoutines)
return output.FLB_RETRY
}
curRetries := outputPlugin.getConcurrentRetries()
if curRetries > 0 {
logrus.Infof("[kinesis %d] flush returning retry, kinesis retries in progress (%d)\n", outputPlugin.PluginID, curRetries)
return output.FLB_RETRY
}
go outputPlugin.FlushWithRetries(count, records)
return output.FLB_OK
}