in infrastructure/emr_trigger/lambda_source/trigger.py [0:0]
def handler(event, context):
logger.info(
"Lambda metadata: {} (type = {})".format(json.dumps(event), type(event))
)
logger.info("Received {} messages".format(len(event["Records"])))
try:
table = dynamodb.Table(os.environ["TABLE_NAME"])
latest = table.get_item(
Key={
"BatchId": "LATEST",
"Name": "LATEST",
}
).get("Item")
if latest is None:
current_batch_id = initialize_table(table)
else:
current_batch_id = str(latest["BatchWindowStartTime"])
for sns_event in event["Records"]:
logger.info("Parsing S3 Event")
sns_message_records = json.loads(sns_event["Sns"]["Message"])["Records"]
for record in sns_message_records:
latest_batch, latest_bag_file = process_sns_message(
record, table, current_batch_id
)
if should_lambda_trigger_pipeline(latest_batch, latest_bag_file):
pipeline_arn = os.environ.get("PIPELINE_ARN", "")
cluster_name = f"demo-scene-detection-{current_batch_id}"
execution = trigger_pipeline(current_batch_id, pipeline_arn, cluster_name)
if execution:
reset_batch(
table,
latest_batch,
pipeline_arn,
execution["executionArn"],
cluster_name,
)
except Exception as e:
trc = traceback.format_exc()
s = "Failed parsing JSON {}: {}\n\n{}".format(str(event), str(e), trc)
logger.error(s)
raise e