in source/lambdas/notification/handler.py [0:0]
def notification(event: dict, context):
"""Handles an S3 Event Notification (for any .csv file written to any key under train/*)
:param dict event: AWS Lambda Event (in this case, an S3 Event message)
:param context: The AWS Lambda Context object
:return: None
"""
# Get the event data, then read the default config file
evt = Event(event)
s3_config = None
# Build the input to the state machine
state_input = {"bucket": evt.bucket, "dataset_file": evt.key}
logger.info(
"Triggered by s3 notification on bucket %s, key %s" % (evt.bucket, evt.key)
)
try:
s3_config = Config.from_s3(evt.bucket)
state_input["config"] = s3_config.config
except ConfigNotFound as excinfo:
logger.warning("The configuration file was not found")
state_input["error"] = {
"serviceError": {
"Error": "ConfigNotFound",
"Cause": json.dumps({"errorMessage": str(excinfo)}),
}
}
except ValueError as excinfo:
logger.warning("There was a problem with the config file: %s" % str(excinfo))
state_input["error"] = {
"serviceError": {
"Error": "ValueError",
"Cause": json.dumps({"errorMessage": str(excinfo)}),
}
}
# validate the config file if it loaded properly
if s3_config:
errors = s3_config.validate()
if errors:
for error in errors:
logger.warning("config problem: %s" % error)
state_input["error"] = {
"serviceError": {
"Error": "ConfigError",
"Cause": json.dumps({"errorMessage": "\n".join(errors)}),
}
}
# Start the AWS Step Function automation of Amazon Forecast
sfn = get_sfn_client()
sfn.start_execution(
stateMachineArn=environ.get("STATE_MACHINE_ARN"),
name=evt.event_id,
input=json.dumps(state_input),
)