in source/update_ddb_from_stream/update_ddb_from_stream.py [0:0]
def lambda_handler(event, context):
log.debug('event: %s', event)
payload = event['records']
output = {}
output_array = []
log.debug('processing %s records', len(payload))
# event_time = str(datetime.now())
for record in payload:
decoded_data = base64.b64decode(record['data']).decode("utf-8")
log.debug('decoded_data: %s', decoded_data)
data = [decoded_data.strip().split(',')]
data = [x for x in data if x[2]!="null"]
for metric_key, metric_group in groupby(data, key=lambda x:"{0}|{1}".format(x[0],x[1])):
grouped_metric = list(metric_group)
for category_key, grouped_rows in groupby(grouped_metric, key=lambda x: "{0}|{1}".format(x[2],x[3])):
output.setdefault(metric_key, {})[category_key] = merge_record_values(metric_key, list(grouped_rows))
for record_key in output:
log.debug('record_key: %s', record_key)
event_time, metric_type = record_key.split('|')
log.debug('event_time: %s', event_time)
log.debug('metric_type: %s', metric_type)
record_data = OrderedDict(sorted(iter(output[record_key].items()), key=itemgetter(1), reverse=True))
ddb_record = client.get_item(
TableName=table_name,
Key={
'MetricType': {'S':metric_type},
'EventTime': {'S':event_time}
},
ConsistentRead=True
)
if 'Item' not in ddb_record:
put_record(metric_type, event_time, record_data)
else:
merged_data = merge_record_with_ddb(record_data, ddb_record)
put_record_with_retry(metric_type, event_time, record_data, merged_data, int(ddb_record['Item']['ConcurrencyToken']['N']))
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(decoded_data.encode("utf-8")).decode("utf-8")
}
output_array.append(output_record)
if send_anonymous_data == "TRUE":
try:
metric_data = {}
metric_data['RecordsProcessed'] = len(payload)
sendAnonymousData(event_time, metric_data)
except Exception as error:
log.error('send_anonymous_data error: %s', error)
else:
log.info('Anonymous usage metrics collection disabled.')
log.debug('returning records: %s', output_array)
return {'records': output_array}