def lambda_handler()

in source/update_ddb_from_stream/update_ddb_from_stream.py [0:0]


def lambda_handler(event, context):
    payload = event['Records']
    output = {}

    data = [base64.b64decode(record['kinesis']['data']).decode().strip().split(',') for record in payload]
    data = filter(lambda x: x[2]!="null", data)
    log.info(data)

    for metric_key,metric_group in groupby(data, key=lambda x:"{0}|{1}".format(x[0],x[1])):
        grouped_metric = list(metric_group)
        for category_key,grouped_rows in groupby(grouped_metric, key=lambda x: "{0}|{1}".format(x[2],x[3])):
            output.setdefault(metric_key, {})[category_key] = merge_record_values(metric_key, list(grouped_rows))

    for record_key in output:
        event_time,metric_type = record_key.split('|')
        record_data = OrderedDict(sorted(output[record_key].items(), key=itemgetter(1), reverse=True))

        ddb_record = client.get_item(TableName=table_name,
            Key={'MetricType': {'S':metric_type},
                 'EventTime':{'S':event_time} },
            ConsistentRead=True)

        if 'Item' not in ddb_record:
            put_record(metric_type,event_time, record_data)
        else:
            merged_data = merge_record_with_ddb(record_data, ddb_record)
            put_record_with_retry(metric_type, event_time, record_data, merged_data, int(ddb_record['Item']['ConcurrencyToken']['N']))
        if metric_type == calls_per_ip:
            max_ip = next(iter(record_data))
            max_ip_count = record_data[max_ip]

            max_ip = max_ip.split('|')[0]
            hour,minute,_ = event_time.split(':')

            ddb_max_ip = client.get_item(TableName=ip_table_name,
                Key={'Hour': {'S': hour},
                 'Minute':{'S':minute} },
                ConsistentRead=True)

            if 'Item' not in ddb_max_ip or max_ip_count > int(ddb_max_ip['Item']['MaxCount']['N']):
                client.put_item(TableName=ip_table_name,
                    Item={'Hour': {'S':hour},
                         'Minute':{'S':minute},
                         'IP':{'S':max_ip},
                         'MaxCount':{'N': str(max_ip_count)}} )
    if send_anonymous_data == "YES":
        try:
            unique_keys = list(set(output))
            for record_key in unique_keys:
                event_time,metric_type = record_key.split('|')
                if metric_type == successful_calls or metric_type == anomaly_score:
                    ddb_record = client.get_item(TableName=table_name,
                        Key={'MetricType': {'S':metric_type},
                             'EventTime':{'S':event_time} },
                        ConsistentRead=True)
                    del ddb_record["Item"]["ConcurrencyToken"]
                    del ddb_record["Item"]["EventTime"]
                    metric_data= {}
                    metric_data['MetricType'] = ddb_record['Item']['MetricType']['S']
                    if metric_type == successful_calls:
                        services, num_calls = ddb_record['Item']['Data']['S'].split(',')[0].split(':')
                        metric_data['NumberOfSuccessfulCalls'] = num_calls.replace('}','').replace(' ', '')
                    if metric_type == anomaly_score:
                        num_calls,anomaly_data = ddb_record['Item']['Data']['S'].split(',')[0].split(':')
                        metric_data['NumberOfSuccessfulCalls'] = num_calls.replace('{', '').replace('"', '').split('|')[0]
                        metric_data['AnamonlyScore'] = anomaly_data.replace('}', '')
                    sendAnonymousData(event_time,metric_data)
        except Exception as error:
            log.error(error)