in python/aws_kinesis_agg/aggregator.py [0:0]
def add_user_record(self, partition_key, data, explicit_hash_key=None):
"""Add a new user record to this existing aggregated record if there is
enough space (based on the defined Kinesis limits for a PutRecord call).
Args:
partition_key - The partition key of the new user record to add (bytes)
explicit_hash_key - The explicit hash key of the new user record to add (bytes)
data - The raw data of the new user record to add (bytes)
Returns:
True if the new user record was successfully added to this
aggregated record or false if this aggregated record is too full."""
if isinstance(partition_key, six.string_types):
partition_key_bytes = partition_key.encode('utf-8')
else:
partition_key_bytes = partition_key
if explicit_hash_key is not None and isinstance(explicit_hash_key, six.string_types):
explicit_hash_key_bytes = explicit_hash_key.encode('utf-8')
elif explicit_hash_key is None:
explicit_hash_key_bytes = None
# explicit_hash_key_bytes = AggRecord._create_explicit_hash_key(partition_key_bytes)
# explicit_hash_key = explicit_hash_key_bytes.decode('utf-8')
else:
explicit_hash_key_bytes = explicit_hash_key
if isinstance(data, six.string_types):
data_bytes = data.encode('utf-8')
else:
data_bytes = data
# Validate new record size won't overflow max size for a PutRecordRequest
size_of_new_record = self._calculate_record_size(partition_key_bytes, data_bytes, explicit_hash_key_bytes)
if size_of_new_record > self.max_size:
raise ValueError('Input record (PK=%s, EHK=%s, SizeBytes=%d) too big to fit inside a single agg record.' %
(partition_key, explicit_hash_key, size_of_new_record))
elif self.get_size_bytes() + size_of_new_record > self.max_size:
return False
record = self.agg_record.records.add()
record.data = data_bytes
pk_add_result = self.partition_keys.add_key(partition_key)
if pk_add_result[0]:
self.agg_record.partition_key_table.append(partition_key)
record.partition_key_index = pk_add_result[1]
if explicit_hash_key:
ehk_add_result = self.explicit_hash_keys.add_key(explicit_hash_key)
if ehk_add_result[0]:
self.agg_record.explicit_hash_key_table.append(explicit_hash_key)
record.explicit_hash_key_index = ehk_add_result[1]
self._agg_size_bytes += size_of_new_record
# if this is the first record, we use its partition key and hash key for the entire agg record
if len(self.agg_record.records) == 1:
self._agg_partition_key = partition_key
self._agg_explicit_hash_key = explicit_hash_key
return True