in python/aws_kinesis_agg/deaggregator.py [0:0]
def iter_deaggregate_records(records, data_format=None):
"""Generator function - Given a set of Kinesis records, deaggregate them one at a time
using the Kinesis aggregated message format. This method will not affect any
records that are not aggregated (but will still return them).
records - The list of raw Kinesis records to deaggregate. (list of dict)
return value - Each yield returns a single Kinesis user record. (dict)"""
# We got a single record...try to coerce it to a list
if isinstance(records, collections.abc.Mapping):
records = [records]
for r in records:
is_aggregated = True
sub_seq_num = 0
if 'kinesis' not in r and 'data' in r or 'Data' in r:
# Kinesis Analytics preprocessors & Firehose transformers use a different format for aggregated
# Kinesis Stream records, so we're going to convert KA / KF style records to KS style records.
if 'kinesisStreamRecordMetadata' in r:
# Kinesis Analytics style record
r = _convert_from_ka_format(r)
elif 'kinesisRecordMetadata' in r:
# Kinesis Firehose style record
r = _convert_from_kf_format(r)
elif data_format == 'Boto3':
# Boto3 Kinesis client style record
r = _convert_from_boto3_format(r)
# Decode the incoming data
raw_data = r['kinesis']['data']
decoded_data = base64.b64decode(raw_data) if data_format != 'Boto3' else raw_data
# Verify the magic header
data_magic = None
if len(decoded_data) >= len(aws_kinesis_agg.MAGIC):
data_magic = decoded_data[:len(aws_kinesis_agg.MAGIC)]
else:
is_aggregated = False
decoded_data_no_magic = decoded_data[len(aws_kinesis_agg.MAGIC):]
if aws_kinesis_agg.MAGIC != data_magic or len(decoded_data_no_magic) <= aws_kinesis_agg.DIGEST_SIZE:
is_aggregated = False
if is_aggregated:
# verify the MD5 digest
message_digest = decoded_data_no_magic[-aws_kinesis_agg.DIGEST_SIZE:]
message_data = decoded_data_no_magic[:-aws_kinesis_agg.DIGEST_SIZE]
md5_calc = hashlib.md5()
md5_calc.update(message_data)
calculated_digest = md5_calc.digest()
if message_digest != calculated_digest:
is_aggregated = False
else:
# Extract the protobuf message
try:
ar = aws_kinesis_agg.kpl_pb2.AggregatedRecord()
ar.ParseFromString(message_data)
pks = ar.partition_key_table
ehks = ar.explicit_hash_key_table
try:
# Split out all the aggregated records into individual records
for mr in ar.records:
new_record = _create_user_record(ehks, pks, mr, r, sub_seq_num)
sub_seq_num += 1
yield new_record
except Exception as e:
error_string = _get_error_string(r, message_data, ehks, pks, ar)
print('ERROR: %s\n%s' % (str(e), error_string), file=sys.stderr)
except google.protobuf.message.DecodeError:
is_aggregated = False
if not is_aggregated:
yield r
return