in src/lambda/InvokeFraudEndpointLambda/lambda_function.py [0:0]
def lambda_handler(event, context):
""" This handler is triggered by incoming Kinesis events,
which contain a payload encapsulating the transaction data.
The Lambda will then lookup corresponding records in the
aggregate feature groups, assemble a payload for inference,
and call the inference endpoint to generate a prediction.
"""
logging.debug('Received event: {}'.format(json.dumps(event, indent=2)))
records = event['Records']
logging.debug('Event contains {} records'.format(len(records)))
ret_records = []
for rec in records:
# Each record has separate eventID, etc.
event_id = rec['eventID']
event_source_arn = rec['eventSourceARN']
logging.debug(f'eventID: {event_id}, eventSourceARN: {event_source_arn}')
kinesis = rec['kinesis']
event_payload = decode_payload(kinesis['data'])
# Collect fields from event payload
cc_num = event_payload['cc_num']
amount = event_payload['amount']
logging.info(f'Event payload data: cc_num: {cc_num} transaction amount: {amount} ')
if 'trans_ts' in event_payload:
trans_ts = event_payload['trans_ts']
calc_trans_time_delay(trans_ts)
aggregate_dict, cutoff_condition = lookup_features(cc_num, amount)
if aggregate_dict is None:
continue
feature_string = assemble_features(amount, aggregate_dict)
prediction = invoke_endpoint(feature_string, cc_num)
dump_stats(prediction, cc_num, amount, aggregate_dict, cutoff_condition)
if prediction is not None:
sequence_num = kinesis['sequenceNumber']
ret_records.append({'eventId': event_id,
'sequenceNumber': sequence_num,
'prediction': prediction,
'statusCode': 200})
return ret_records