in repos/serving/lambdas/functions/execute-state-machine/lambda_function.py [0:0]
def lambda_handler(event, context):
"""Calls custom job waiter developed by user
Arguments:
event {dict} -- Dictionary with details on previous processing step
context {dict} -- Dictionary with details on Lambda context
Returns:
{dict} -- Dictionary with Processed Bucket, Key(s) and Job Details
"""
try:
logger.info("Lambda event is [{}]".format(event))
# Note: For simplicity, parameters "target_job"
# and "target_ddb" are hardcoded values defined during deployment of thhe pipeline.
# Other parameters can be dynamically retrieved
for record in event["Records"]:
payload = json.loads(record["body"])
logger.info("payload: ", payload)
token = payload["token"]
arguments = payload["arguments"]
source_bucket = arguments["bucket"]
key_to_process = arguments["key_to_process"]
logger.info("Trigger execution of state machine [{}]".format(sm_arn))
# Prepare input to state machine
message = {
"statusCode": 200,
"body": {
"bucket": source_bucket,
"keysRawProc": [key_to_process],
"targetJob": target_job,
"targetDDBTable": target_ddb,
"token": token,
},
}
logger.info("Input Message is [{}]".format(message))
client.start_execution(
stateMachineArn=sm_arn, input=json.dumps(message, default=json_serial)
)
except Exception as e:
logger.error("Fatal error", exc_info=True)
sagemaker.send_pipeline_execution_step_failure(
CallbackToken=token, FailureReason="Fatal error"
)
raise e
return 200