func FLBPluginFlushCtx()

in fluent-bit-firehose.go [109:154]


func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
	var count int
	var ret int
	var ts interface{}
	var timestamp time.Time
	var record map[interface{}]interface{}

	// Create Fluent Bit decoder
	dec := output.NewDecoder(data, int(length))

	firehoseOutput := getPluginInstance(ctx)
	fluentTag := C.GoString(tag)
	logrus.Debugf("[firehose %d] Found logs with tag: %s", firehoseOutput.PluginID, fluentTag)

	for {
		// Extract Record
		ret, ts, record = output.GetRecord(dec)
		if ret != 0 {
			break
		}

		switch tts := ts.(type) {
		case output.FLBTime:
			timestamp = tts.Time
		case uint64:
			// when ts is of type uint64 it appears to
			// be the amount of seconds since unix epoch.
			timestamp = time.Unix(int64(tts), 0)
		default:
			timestamp = time.Now()
		}

		retCode := firehoseOutput.AddRecord(record, &timestamp)
		if retCode != output.FLB_OK {
			return retCode
		}
		count++
	}
	retCode := firehoseOutput.Flush()
	if retCode != output.FLB_OK {
		return retCode
	}
	logrus.Debugf("[firehose %d] Processed %d events with tag %s", firehoseOutput.PluginID, count, fluentTag)

	return output.FLB_OK
}