in fluent-bit-kinesis.go [206:252]
func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, length C.int) ([]*kinesisAPI.PutRecordsRequestEntry, int, int) {
var ret int
var ts interface{}
var timestamp time.Time
var record map[interface{}]interface{}
count := 0
records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut)
// Create Fluent Bit decoder
dec := output.NewDecoder(data, int(length))
for {
//Extract Record
ret, ts, record = output.GetRecord(dec)
if ret != 0 {
break
}
switch tts := ts.(type) {
case output.FLBTime:
timestamp = tts.Time
case uint64:
// when ts is of type uint64 it appears to
// be the amount of seconds since unix epoch.
timestamp = time.Unix(int64(tts), 0)
default:
timestamp = time.Now()
}
retCode := kinesisOutput.AddRecord(&records, record, ×tamp)
if retCode != output.FLB_OK {
return nil, 0, retCode
}
count++
}
if kinesisOutput.IsAggregate() {
retCode := kinesisOutput.FlushAggregatedRecords(&records)
if retCode != output.FLB_OK {
return nil, 0, retCode
}
}
return records, count, output.FLB_OK
}