def lambda_handler()

in src/lambda/InvokeFraudEndpointLambda/lambda_function.py [0:0]


def lambda_handler(event, context):
    """ This handler is triggered by incoming Kinesis events,
    which contain a payload encapsulating the transaction data.
    The Lambda will then lookup corresponding records in the
    aggregate feature groups, assemble a payload for inference,
    and call the inference endpoint to generate a prediction.
    """
    logging.debug('Received event: {}'.format(json.dumps(event, indent=2)))

    records = event['Records']
    logging.debug('Event contains {} records'.format(len(records)))
    
    ret_records = []
    for rec in records:
        # Each record has separate eventID, etc.
        event_id = rec['eventID']
        event_source_arn = rec['eventSourceARN']
        logging.debug(f'eventID: {event_id}, eventSourceARN: {event_source_arn}')

        kinesis = rec['kinesis']
        event_payload = decode_payload(kinesis['data'])

        # Collect fields from event payload
        cc_num = event_payload['cc_num']
        amount = event_payload['amount']
        logging.info(f'Event payload data: cc_num: {cc_num} transaction amount: {amount} ')

        if 'trans_ts' in event_payload:
            trans_ts = event_payload['trans_ts']
            calc_trans_time_delay(trans_ts)

        aggregate_dict, cutoff_condition = lookup_features(cc_num, amount)
        if aggregate_dict is None:
            continue

        feature_string = assemble_features(amount, aggregate_dict)
        prediction = invoke_endpoint(feature_string, cc_num)
        
        dump_stats(prediction, cc_num, amount, aggregate_dict, cutoff_condition)
        
        if prediction is not None:
            sequence_num = kinesis['sequenceNumber']
            ret_records.append({'eventId': event_id,
                            'sequenceNumber': sequence_num,
                            'prediction': prediction,
                            'statusCode': 200})

    return ret_records