in src/helper.py [0:0]
def get_user_params(job_data, required_param_list):
"""Pulls and validates required user parameters from job_data, returns json object with parameters
Args:
job_data (dict): Job data from event
required_param_list (list): List of parameters required to be pulled
Returns:
:obj:`json`: Returns json object (dict) of the user parameters
"""
logger.debug(f'Getting User Parameters ({", ".join(required_param_list)})')
try:
# Get the user parameters which contain the stack, artifact and file settings
user_parameters = job_data['actionConfiguration']['configuration']['UserParameters']
decoded_parameters = json.loads(user_parameters)
logger.info(f"decoded_parameters:{decoded_parameters}")
except TypeError as e:
# We're expecting the user parameters to be encoded as JSON so we can pass multiple values.
# If the JSON can't be decoded then fail the job with a helpful message.
raise Exception('UserParameters could not be decoded as JSON') from e
for param in required_param_list:
if param not in decoded_parameters:
# Validate that the stack is provided, otherwise fail the job with a helpful message.
raise Exception(f'Error: Parameter ({param}) not found in Function Invoke.')
return decoded_parameters