def lambda_handler()

in source/update_ddb_from_stream/update_ddb_from_stream.py [0:0]


def lambda_handler(event, context):
    log.debug('event: %s', event)
    payload = event['records']
    output = {}
    output_array = []
    log.debug('processing %s records', len(payload))

    # event_time = str(datetime.now())

    for record in payload:

        decoded_data = base64.b64decode(record['data']).decode("utf-8")

        log.debug('decoded_data: %s', decoded_data)

        data = [decoded_data.strip().split(',')]
        data = [x for x in data if x[2]!="null"]

        for metric_key, metric_group in groupby(data, key=lambda x:"{0}|{1}".format(x[0],x[1])):
            grouped_metric = list(metric_group)
            for category_key, grouped_rows in groupby(grouped_metric, key=lambda x: "{0}|{1}".format(x[2],x[3])):
                output.setdefault(metric_key, {})[category_key] = merge_record_values(metric_key, list(grouped_rows))

        for record_key in output:
            log.debug('record_key: %s', record_key)
            event_time, metric_type = record_key.split('|')
            log.debug('event_time: %s', event_time)
            log.debug('metric_type: %s', metric_type)
            record_data = OrderedDict(sorted(iter(output[record_key].items()), key=itemgetter(1), reverse=True))

            ddb_record = client.get_item(
                TableName=table_name,
                Key={
                    'MetricType': {'S':metric_type},
                    'EventTime': {'S':event_time}
                },
                ConsistentRead=True
            )

            if 'Item' not in ddb_record:
                put_record(metric_type, event_time, record_data)
            else:
                merged_data = merge_record_with_ddb(record_data, ddb_record)
                put_record_with_retry(metric_type, event_time, record_data, merged_data, int(ddb_record['Item']['ConcurrencyToken']['N']))

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(decoded_data.encode("utf-8")).decode("utf-8")
        }
        output_array.append(output_record)

    if send_anonymous_data == "TRUE":
        try:
            metric_data = {}
            metric_data['RecordsProcessed'] = len(payload)
            sendAnonymousData(event_time, metric_data)
        except Exception as error:
            log.error('send_anonymous_data error: %s', error)
    else:
        log.info('Anonymous usage metrics collection disabled.')

    log.debug('returning records: %s', output_array)

    return {'records': output_array}