in aggregate/aggregator.go [124:159]
func (a *Aggregator) AggregateRecords() (entry *kinesis.PutRecordsRequestEntry, err error) {
if len(a.records) == 0 {
return nil, nil
}
pkeys := a.getPartitionKeys()
agg := &AggregatedRecord{
PartitionKeyTable: pkeys,
Records: a.records,
}
protoBufData, err := proto.Marshal(agg)
if err != nil {
logrus.Errorf("Failed to encode record: %v", err)
return nil, err
}
md5Sum := md5.New()
md5Sum.Write(protoBufData)
md5CheckSum := md5Sum.Sum(nil)
kclData := append(kclMagicNumber, protoBufData...)
kclData = append(kclData, md5CheckSum...)
logrus.Debugf("[kinesis ] Aggregated (%d) records of size (%d) with total size (%d), partition key (%s)\n", len(a.records), a.getSize(), len(kclData), pkeys[0])
// Clear buffer if aggregation didn't fail
a.clearBuffer()
return &kinesis.PutRecordsRequestEntry{
Data: kclData,
PartitionKey: aws.String(pkeys[0]),
}, nil
}