func FLBPluginFlushCtx()

in fluent-bit-cloudwatch.go [161:215]


func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
	var count int
	var ret int
	var ts interface{}
	var record map[interface{}]interface{}

	// Create Fluent Bit decoder
	dec := output.NewDecoder(data, int(length))

	cloudwatchLogs := getPluginInstance(ctx)

	fluentTag := C.GoString(tag)
	logrus.Debugf("[cloudwatch %d] Found logs with tag: %s", cloudwatchLogs.PluginInstanceID, fluentTag)

	for {
		// Extract Record
		ret, ts, record = output.GetRecord(dec)
		if ret != 0 {
			break
		}

		var timestamp time.Time
		switch tts := ts.(type) {
		case output.FLBTime:
			timestamp = tts.Time
		case uint64:
			// when ts is of type uint64 it appears to
			// be the amount of seconds since unix epoch.
			timestamp = time.Unix(int64(tts), 0)
		default:
			timestamp = time.Now()
		}

		retCode := cloudwatchLogs.AddEvent(&cloudwatch.Event{Tag: fluentTag, Record: record, TS: timestamp})
		if retCode != output.FLB_OK {
			return retCode
		}
		count++
	}
	err := cloudwatchLogs.Flush()
	if err != nil {
		fmt.Println(err)
		// TODO: Better error handling
		return output.FLB_RETRY
	}

	logrus.Debugf("[cloudwatch %d] Processed %d events", cloudwatchLogs.PluginInstanceID, count)

	// Return options:
	//
	// output.FLB_OK    = data have been processed.
	// output.FLB_ERROR = unrecoverable error, do not try this again.
	// output.FLB_RETRY = retry to flush later.
	return output.FLB_OK
}