in fluent-bit-kinesis.go [186:204]
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
kinesisOutput := getPluginInstance(ctx)
fluentTag := C.GoString(tag)
events, count, retCode := unpackRecords(kinesisOutput, data, length)
if retCode != output.FLB_OK {
logrus.Errorf("[kinesis %d] failed to unpackRecords with tag: %s\n", kinesisOutput.PluginID, fluentTag)
return retCode
}
logrus.Debugf("[kinesis %d] Flushing %d logs with tag: %s\n", kinesisOutput.PluginID, count, fluentTag)
if kinesisOutput.Concurrency > 0 {
return kinesisOutput.FlushConcurrent(count, events)
}
return kinesisOutput.Flush(&events)
}