in aggregate/aggregator.go [51:121]
func (a *Aggregator) AddRecord(partitionKey string, hasPartitionKey bool, data []byte) (entry *kinesis.PutRecordsRequestEntry, err error) {
if hasPartitionKey {
partitionKeySize := len([]byte(partitionKey))
if partitionKeySize < 1 {
return nil, fmt.Errorf("Invalid partition key provided")
}
}
dataSize := len(data)
// If this is a very large record, then don't aggregate it.
if dataSize >= a.maxAggRecordSize {
if !hasPartitionKey {
partitionKey = a.stringGen.RandomString()
}
return &kinesis.PutRecordsRequestEntry{
Data: data,
PartitionKey: aws.String(partitionKey),
}, nil
}
if !hasPartitionKey {
if len(a.partitionKeys) > 0 {
// Take any partition key from the map, as long as one exists
for k, _ := range a.partitionKeys {
partitionKey = k
break
}
} else {
partitionKey = a.stringGen.RandomString()
}
}
// Check if we need to add a new partition key, and if we do how much space it will take
pKeyIdx, pKeyAddedSize := a.checkPartitionKey(partitionKey)
// data field size is proto size of data + data field number size
// partition key field size is varint of index size + field number size
dataFieldSize := protowire.SizeBytes(dataSize) + fieldNumberSize
pkeyFieldSize := protowire.SizeVarint(pKeyIdx) + fieldNumberSize
// Total size is byte size of data + pkey field + field number of parent proto
if a.getSize()+protowire.SizeBytes(dataFieldSize+pkeyFieldSize)+fieldNumberSize+pKeyAddedSize >= maximumRecordSize {
// Aggregate records, and return if error
entry, err = a.AggregateRecords()
if err != nil {
return entry, err
}
if !hasPartitionKey {
// choose a new partition key if needed now that we've aggregated the previous records
partitionKey = a.stringGen.RandomString()
}
// Recompute field size, since it changed
pKeyIdx, _ = a.checkPartitionKey(partitionKey)
pkeyFieldSize = protowire.SizeVarint(pKeyIdx) + fieldNumberSize
}
// Add new record, and update aggSize
partitionKeyIndex := a.addPartitionKey(partitionKey)
a.records = append(a.records, &Record{
Data: data,
PartitionKeyIndex: &partitionKeyIndex,
})
a.aggSize += protowire.SizeBytes(dataFieldSize+pkeyFieldSize) + fieldNumberSize
return entry, err
}