func unpackRecords()

in fluent-bit-kinesis.go [206:252]


func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, length C.int) ([]*kinesisAPI.PutRecordsRequestEntry, int, int) {
	var ret int
	var ts interface{}
	var timestamp time.Time
	var record map[interface{}]interface{}
	count := 0

	records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut)

	// Create Fluent Bit decoder
	dec := output.NewDecoder(data, int(length))

	for {
		//Extract Record
		ret, ts, record = output.GetRecord(dec)
		if ret != 0 {
			break
		}

		switch tts := ts.(type) {
		case output.FLBTime:
			timestamp = tts.Time
		case uint64:
			// when ts is of type uint64 it appears to
			// be the amount of seconds since unix epoch.
			timestamp = time.Unix(int64(tts), 0)
		default:
			timestamp = time.Now()
		}

		retCode := kinesisOutput.AddRecord(&records, record, &timestamp)
		if retCode != output.FLB_OK {
			return nil, 0, retCode
		}

		count++
	}

	if kinesisOutput.IsAggregate() {
		retCode := kinesisOutput.FlushAggregatedRecords(&records)
		if retCode != output.FLB_OK {
			return nil, 0, retCode
		}
	}

	return records, count, output.FLB_OK
}