func()

in aggregate/aggregator.go [51:121]


func (a *Aggregator) AddRecord(partitionKey string, hasPartitionKey bool, data []byte) (entry *kinesis.PutRecordsRequestEntry, err error) {

	if hasPartitionKey {
		partitionKeySize := len([]byte(partitionKey))
		if partitionKeySize < 1 {
			return nil, fmt.Errorf("Invalid partition key provided")
		}
	}

	dataSize := len(data)

	// If this is a very large record, then don't aggregate it.
	if dataSize >= a.maxAggRecordSize {
		if !hasPartitionKey {
			partitionKey = a.stringGen.RandomString()
		}
		return &kinesis.PutRecordsRequestEntry{
			Data:         data,
			PartitionKey: aws.String(partitionKey),
		}, nil
	}

	if !hasPartitionKey {
		if len(a.partitionKeys) > 0 {
			// Take any partition key from the map, as long as one exists
			for k, _ := range a.partitionKeys {
				partitionKey = k
				break
			}
		} else {
			partitionKey = a.stringGen.RandomString()
		}
	}

	// Check if we need to add a new partition key, and if we do how much space it will take
	pKeyIdx, pKeyAddedSize := a.checkPartitionKey(partitionKey)

	// data field size is proto size of data + data field number size
	// partition key field size is varint of index size + field number size
	dataFieldSize := protowire.SizeBytes(dataSize) + fieldNumberSize
	pkeyFieldSize := protowire.SizeVarint(pKeyIdx) + fieldNumberSize
	// Total size is byte size of data + pkey field + field number of parent proto

	if a.getSize()+protowire.SizeBytes(dataFieldSize+pkeyFieldSize)+fieldNumberSize+pKeyAddedSize >= maximumRecordSize {
		// Aggregate records, and return if error
		entry, err = a.AggregateRecords()
		if err != nil {
			return entry, err
		}

		if !hasPartitionKey {
			// choose a new partition key if needed now that we've aggregated the previous records
			partitionKey = a.stringGen.RandomString()
		}
		// Recompute field size, since it changed
		pKeyIdx, _ = a.checkPartitionKey(partitionKey)
		pkeyFieldSize = protowire.SizeVarint(pKeyIdx) + fieldNumberSize
	}

	// Add new record, and update aggSize
	partitionKeyIndex := a.addPartitionKey(partitionKey)

	a.records = append(a.records, &Record{
		Data:              data,
		PartitionKeyIndex: &partitionKeyIndex,
	})

	a.aggSize += protowire.SizeBytes(dataFieldSize+pkeyFieldSize) + fieldNumberSize

	return entry, err
}