func()

in aggregate/aggregator.go [124:159]


func (a *Aggregator) AggregateRecords() (entry *kinesis.PutRecordsRequestEntry, err error) {

	if len(a.records) == 0 {
		return nil, nil
	}

	pkeys := a.getPartitionKeys()

	agg := &AggregatedRecord{
		PartitionKeyTable: pkeys,
		Records:           a.records,
	}

	protoBufData, err := proto.Marshal(agg)
	if err != nil {
		logrus.Errorf("Failed to encode record: %v", err)
		return nil, err
	}

	md5Sum := md5.New()
	md5Sum.Write(protoBufData)
	md5CheckSum := md5Sum.Sum(nil)

	kclData := append(kclMagicNumber, protoBufData...)
	kclData = append(kclData, md5CheckSum...)

	logrus.Debugf("[kinesis ] Aggregated (%d) records of size (%d) with total size (%d), partition key (%s)\n", len(a.records), a.getSize(), len(kclData), pkeys[0])

	// Clear buffer if aggregation didn't fail
	a.clearBuffer()

	return &kinesis.PutRecordsRequestEntry{
		Data:         kclData,
		PartitionKey: aws.String(pkeys[0]),
	}, nil
}