in fluent-bit-cloudwatch.go [161:215]
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
var count int
var ret int
var ts interface{}
var record map[interface{}]interface{}
// Create Fluent Bit decoder
dec := output.NewDecoder(data, int(length))
cloudwatchLogs := getPluginInstance(ctx)
fluentTag := C.GoString(tag)
logrus.Debugf("[cloudwatch %d] Found logs with tag: %s", cloudwatchLogs.PluginInstanceID, fluentTag)
for {
// Extract Record
ret, ts, record = output.GetRecord(dec)
if ret != 0 {
break
}
var timestamp time.Time
switch tts := ts.(type) {
case output.FLBTime:
timestamp = tts.Time
case uint64:
// when ts is of type uint64 it appears to
// be the amount of seconds since unix epoch.
timestamp = time.Unix(int64(tts), 0)
default:
timestamp = time.Now()
}
retCode := cloudwatchLogs.AddEvent(&cloudwatch.Event{Tag: fluentTag, Record: record, TS: timestamp})
if retCode != output.FLB_OK {
return retCode
}
count++
}
err := cloudwatchLogs.Flush()
if err != nil {
fmt.Println(err)
// TODO: Better error handling
return output.FLB_RETRY
}
logrus.Debugf("[cloudwatch %d] Processed %d events", cloudwatchLogs.PluginInstanceID, count)
// Return options:
//
// output.FLB_OK = data have been processed.
// output.FLB_ERROR = unrecoverable error, do not try this again.
// output.FLB_RETRY = retry to flush later.
return output.FLB_OK
}