def iter_deaggregate_records()

in python/aws_kinesis_agg/deaggregator.py [0:0]


def iter_deaggregate_records(records, data_format=None):
    """Generator function - Given a set of Kinesis records, deaggregate them one at a time
    using the Kinesis aggregated message format.  This method will not affect any
    records that are not aggregated (but will still return them).
    
    records - The list of raw Kinesis records to deaggregate. (list of dict)
    
    return value - Each yield returns a single Kinesis user record. (dict)"""
    
    # We got a single record...try to coerce it to a list
    if isinstance(records, collections.abc.Mapping):
        records = [records]
        
    for r in records:
        is_aggregated = True
        sub_seq_num = 0

        if 'kinesis' not in r and 'data' in r or 'Data' in r:
            # Kinesis Analytics preprocessors & Firehose transformers use a different format for aggregated
            # Kinesis Stream records, so we're going to convert KA / KF style records to KS style records.
            if 'kinesisStreamRecordMetadata' in r:
                # Kinesis Analytics style record
                r = _convert_from_ka_format(r)
            elif 'kinesisRecordMetadata' in r:
                # Kinesis Firehose style record
                r = _convert_from_kf_format(r)
            elif data_format == 'Boto3':
                # Boto3 Kinesis client style record
                r = _convert_from_boto3_format(r)

        # Decode the incoming data
        raw_data = r['kinesis']['data']

        decoded_data = base64.b64decode(raw_data) if data_format != 'Boto3' else raw_data
        
        # Verify the magic header
        data_magic = None
        if len(decoded_data) >= len(aws_kinesis_agg.MAGIC):
            data_magic = decoded_data[:len(aws_kinesis_agg.MAGIC)]
        else:
            is_aggregated = False
        
        decoded_data_no_magic = decoded_data[len(aws_kinesis_agg.MAGIC):]
        
        if aws_kinesis_agg.MAGIC != data_magic or len(decoded_data_no_magic) <= aws_kinesis_agg.DIGEST_SIZE:
            is_aggregated = False
            
        if is_aggregated:            
            
            # verify the MD5 digest
            message_digest = decoded_data_no_magic[-aws_kinesis_agg.DIGEST_SIZE:]
            message_data = decoded_data_no_magic[:-aws_kinesis_agg.DIGEST_SIZE]

            md5_calc = hashlib.md5()
            md5_calc.update(message_data)
            calculated_digest = md5_calc.digest()
            
            if message_digest != calculated_digest:            
                is_aggregated = False            
            else:                            
                # Extract the protobuf message
                try:    
                    ar = aws_kinesis_agg.kpl_pb2.AggregatedRecord()
                    ar.ParseFromString(message_data)
                    
                    pks = ar.partition_key_table
                    ehks = ar.explicit_hash_key_table
                    
                    try:                    
                        # Split out all the aggregated records into individual records    
                        for mr in ar.records:                                                    
                            new_record = _create_user_record(ehks, pks, mr, r, sub_seq_num)
                            sub_seq_num += 1
                            yield new_record
                        
                    except Exception as e:        
                        
                        error_string = _get_error_string(r, message_data, ehks, pks, ar)
                        print('ERROR: %s\n%s' % (str(e), error_string), file=sys.stderr)
                    
                except google.protobuf.message.DecodeError:                    
                    is_aggregated = False
        
        if not is_aggregated:
            yield r
    
    return