in src/lambda/StreamingIngestAggFeatures/lambda_function.py [0:0]
def lambda_handler(event, context):
inv_id = event['invocationId']
app_arn = event['applicationArn']
records = event['records']
print(f'Received {len(records)} records, invocation id: {inv_id}, app arn: {app_arn}')
ret_records = []
for rec in records:
data = rec['data']
agg_data_str = base64.b64decode(data)
agg_data = json.loads(agg_data_str)
cc_num = agg_data['cc_num']
num_trans_last_10m = agg_data['num_trans_last_10m']
avg_amt_last_10m = agg_data['avg_amt_last_10m']
print(f' updating agg features for card: {cc_num}, avg amt last 10m: {avg_amt_last_10m}, num trans last 10m: {num_trans_last_10m}')
update_agg(CC_AGG_FEATURE_GROUP, cc_num, avg_amt_last_10m, num_trans_last_10m)
# Flag each record as being "Ok", so that Kinesis won't try to re-send
ret_records.append({'recordId': rec['recordId'],
'result': 'Ok'})
return {'records': ret_records}