def lambda_handler()

in src/lambda/StreamingIngestAggFeatures/lambda_function.py [0:0]


def lambda_handler(event, context):
    inv_id = event['invocationId']
    app_arn = event['applicationArn']
    records = event['records']
    print(f'Received {len(records)} records, invocation id: {inv_id}, app arn: {app_arn}')
    
    ret_records = []
    for rec in records:
        data = rec['data']
        agg_data_str = base64.b64decode(data) 
        agg_data = json.loads(agg_data_str)
        
        cc_num = agg_data['cc_num']
        num_trans_last_10m = agg_data['num_trans_last_10m']
        avg_amt_last_10m = agg_data['avg_amt_last_10m']

        print(f' updating agg features for card: {cc_num}, avg amt last 10m: {avg_amt_last_10m}, num trans last 10m: {num_trans_last_10m}')
        update_agg(CC_AGG_FEATURE_GROUP, cc_num, avg_amt_last_10m, num_trans_last_10m)
        
        # Flag each record as being "Ok", so that Kinesis won't try to re-send 
        ret_records.append({'recordId': rec['recordId'],
                            'result': 'Ok'})
    return {'records': ret_records}