in source/update_ddb_from_stream/update_ddb_from_stream.py [0:0]
def lambda_handler(event, context):
payload = event['Records']
output = {}
data = [base64.b64decode(record['kinesis']['data']).decode().strip().split(',') for record in payload]
data = filter(lambda x: x[2]!="null", data)
log.info(data)
for metric_key,metric_group in groupby(data, key=lambda x:"{0}|{1}".format(x[0],x[1])):
grouped_metric = list(metric_group)
for category_key,grouped_rows in groupby(grouped_metric, key=lambda x: "{0}|{1}".format(x[2],x[3])):
output.setdefault(metric_key, {})[category_key] = merge_record_values(metric_key, list(grouped_rows))
for record_key in output:
event_time,metric_type = record_key.split('|')
record_data = OrderedDict(sorted(output[record_key].items(), key=itemgetter(1), reverse=True))
ddb_record = client.get_item(TableName=table_name,
Key={'MetricType': {'S':metric_type},
'EventTime':{'S':event_time} },
ConsistentRead=True)
if 'Item' not in ddb_record:
put_record(metric_type,event_time, record_data)
else:
merged_data = merge_record_with_ddb(record_data, ddb_record)
put_record_with_retry(metric_type, event_time, record_data, merged_data, int(ddb_record['Item']['ConcurrencyToken']['N']))
if metric_type == calls_per_ip:
max_ip = next(iter(record_data))
max_ip_count = record_data[max_ip]
max_ip = max_ip.split('|')[0]
hour,minute,_ = event_time.split(':')
ddb_max_ip = client.get_item(TableName=ip_table_name,
Key={'Hour': {'S': hour},
'Minute':{'S':minute} },
ConsistentRead=True)
if 'Item' not in ddb_max_ip or max_ip_count > int(ddb_max_ip['Item']['MaxCount']['N']):
client.put_item(TableName=ip_table_name,
Item={'Hour': {'S':hour},
'Minute':{'S':minute},
'IP':{'S':max_ip},
'MaxCount':{'N': str(max_ip_count)}} )
if send_anonymous_data == "YES":
try:
unique_keys = list(set(output))
for record_key in unique_keys:
event_time,metric_type = record_key.split('|')
if metric_type == successful_calls or metric_type == anomaly_score:
ddb_record = client.get_item(TableName=table_name,
Key={'MetricType': {'S':metric_type},
'EventTime':{'S':event_time} },
ConsistentRead=True)
del ddb_record["Item"]["ConcurrencyToken"]
del ddb_record["Item"]["EventTime"]
metric_data= {}
metric_data['MetricType'] = ddb_record['Item']['MetricType']['S']
if metric_type == successful_calls:
services, num_calls = ddb_record['Item']['Data']['S'].split(',')[0].split(':')
metric_data['NumberOfSuccessfulCalls'] = num_calls.replace('}','').replace(' ', '')
if metric_type == anomaly_score:
num_calls,anomaly_data = ddb_record['Item']['Data']['S'].split(',')[0].split(':')
metric_data['NumberOfSuccessfulCalls'] = num_calls.replace('{', '').replace('"', '').split('|')[0]
metric_data['AnamonlyScore'] = anomaly_data.replace('}', '')
sendAnonymousData(event_time,metric_data)
except Exception as error:
log.error(error)