def handler()

in infrastructure/emr_trigger/lambda_source/trigger.py [0:0]


def handler(event, context):
    logger.info(
        "Lambda metadata: {} (type = {})".format(json.dumps(event), type(event))
    )
    logger.info("Received {} messages".format(len(event["Records"])))
    try:
        table = dynamodb.Table(os.environ["TABLE_NAME"])

        latest = table.get_item(
            Key={
                "BatchId": "LATEST",
                "Name": "LATEST",
            }
        ).get("Item")

        if latest is None:
            current_batch_id = initialize_table(table)
        else:
            current_batch_id = str(latest["BatchWindowStartTime"])

        for sns_event in event["Records"]:
            logger.info("Parsing S3 Event")
            sns_message_records = json.loads(sns_event["Sns"]["Message"])["Records"]

            for record in sns_message_records:
                latest_batch, latest_bag_file = process_sns_message(
                    record, table, current_batch_id
                )

        if should_lambda_trigger_pipeline(latest_batch, latest_bag_file):
            pipeline_arn = os.environ.get("PIPELINE_ARN", "")
            cluster_name = f"demo-scene-detection-{current_batch_id}"
            execution = trigger_pipeline(current_batch_id, pipeline_arn, cluster_name)
            if execution:
                reset_batch(
                    table,
                    latest_batch,
                    pipeline_arn,
                    execution["executionArn"],
                    cluster_name,
                )

    except Exception as e:
        trc = traceback.format_exc()
        s = "Failed parsing JSON {}: {}\n\n{}".format(str(event), str(e), trc)
        logger.error(s)
        raise e