in source/code/configuration/task_configuration.py [0:0]
def _verify_configuration_item(self, **task_attributes):
"""
Verifies the parameters for creating or updating a task configuration item
:param task_attributes: The configuration parameters
Constants for dictionary keys can be found in configuration/__init__.py
:return: Verified configuration item
"""
result = {}
valid_attributes = VALID_TASK_ATTRIBUTES
def remove_empty_attributes(o):
def clean_dict(d):
result_dict = {}
for k, v in d.items():
vv = remove_empty_attributes(v)
if vv is not None:
result_dict[k] = vv
return result_dict if len(result_dict) > 0 else None
def clean_val(l):
result_list = []
for i in l:
ii = remove_empty_attributes(i)
if ii is not None:
result_list.append(ii)
return result_list if len(result_list) > 0 else None
if isinstance(o, dict):
return clean_dict(o)
elif isinstance(o, type([])):
return clean_val(o)
else:
return o if o is not None and len(str(o)) > 0 else None
attributes = remove_empty_attributes(task_attributes)
# test for missing required parameters
for attr in [configuration.CONFIG_TASK_NAME, configuration.CONFIG_ACTION_NAME]:
if attr not in attributes or len(attributes[attr]) == 0:
raise_value_error(ERR_MISSING_REQUIRED_PARAMETER, attr)
# test for unknown parameters
for attr in attributes:
if attr not in valid_attributes:
raise_value_error(ERR_UNKNOWN_PARAMETER, attr, ",".join(valid_attributes))
result[configuration.CONFIG_TASK_NAME] = attributes[configuration.CONFIG_TASK_NAME]
action_name = self.validate_action(attributes[configuration.CONFIG_ACTION_NAME])
result[configuration.CONFIG_ACTION_NAME] = action_name
process_this_account = TaskConfiguration.as_boolean(attributes.get(configuration.CONFIG_THIS_ACCOUNT, True))
result[configuration.CONFIG_THIS_ACCOUNT] = process_this_account
account = self.this_account if process_this_account else None
for attr in attributes:
if attr in [configuration.CONFIG_TASK_NAME, configuration.CONFIG_ACTION_NAME, configuration.CONFIG_PARAMETERS]:
continue
try:
# verify cross-account roles
if attr == configuration.CONFIG_ACCOUNTS:
if len(attributes[attr]) > 0:
result[attr] = self.verify_accounts(account, attributes[attr], action_name,
attributes[configuration.CONFIG_TASK_NAME])
continue
if attr == configuration.CONFIG_TASK_CROSS_ACCOUNT_ROLE_NAME:
result[attr] = TaskConfiguration.verify_task_role_name(attributes[attr], action_name)
# verify boolean enabled, dryrun and debug parameters
if attr in [
configuration.CONFIG_ENABLED,
configuration.CONFIG_DRYRUN,
configuration.CONFIG_DEBUG,
configuration.CONFIG_TASK_NOTIFICATIONS,
configuration.CONFIG_TASK_METRICS
]:
result[attr] = TaskConfiguration.as_boolean(attributes[attr])
continue
# verify interval (cron) expression
if attr == configuration.CONFIG_INTERVAL:
result[attr] = self.verify_interval(attributes[attr], attributes, action_name=action_name,
task_name=attributes[configuration.CONFIG_TASK_NAME])
continue
# verify timeout for task
if attr == configuration.CONFIG_TASK_TIMEOUT:
timeout = TaskConfiguration.verify_timeout(action_name, attributes[attr])
if timeout is not None:
result[attr] = timeout
continue
# memory settings for task
if attr in [
configuration.CONFIG_TASK_SELECT_SIZE,
configuration.CONFIG_TASK_EXECUTE_SIZE,
configuration.CONFIG_TASK_COMPLETION_SIZE
]:
result[attr] = TaskConfiguration.validate_lambda_size(attributes[attr])
continue
# Ecs memory settings for task
if attr in [
configuration.CONFIG_ECS_SELECT_MEMORY,
configuration.CONFIG_ECS_EXECUTE_MEMORY,
configuration.CONFIG_ECS_COMPLETION_MEMORY
]:
try:
if attributes[attr] is not None:
result[attr] = int(attributes[attr])
except ValueError:
raise_exception(ERR_INVALID_ECS_MEMORY_SIZE, attributes[attr], attr)
continue
# verify timezone
if attr == configuration.CONFIG_TIMEZONE:
result[attr] = self.verified_timezone(attributes[attr]) or DEFAULT_TIMEZONE
continue
# verify tag filter
if attr == configuration.CONFIG_TAG_FILTER:
tag_filter = TaskConfiguration.validate_tagfilter(attributes[attr], action_name)
if tag_filter is not None:
result[attr] = tag_filter
continue
# verify events
if attr == configuration.CONFIG_EVENTS:
result[attr] = TaskConfiguration.validate_events(attributes[attr], action_name)
continue
# verify event scopes
if attr == configuration.CONFIG_EVENT_SCOPES:
result[attr] = TaskConfiguration.validate_event_scopes(attributes[attr], action_name)
continue
# verify regions
if attr == configuration.CONFIG_REGIONS:
result[attr] = "*" if attributes[attr] in ["*", ["*"]] else self.validate_regions(attributes[attr],
action_name)
# verify internal
if attr == configuration.CONFIG_INTERNAL:
result[attr] = TaskConfiguration.verify_internal(attributes[attr], action_name)
continue
# copy description and stack
if attr in [configuration.CONFIG_DESCRIPTION, configuration.CONFIG_STACK_ID]:
result[attr] = attributes[attr]
except ValueError as ex:
raise ValueError("Parameter : {}, ({})".format(attr, str(ex)))
# default for enabled parameter
if configuration.CONFIG_ENABLED not in result:
result[configuration.CONFIG_ENABLED] = True
# default for metrics parameter
if configuration.CONFIG_TASK_METRICS not in result:
result[configuration.CONFIG_TASK_METRICS] = False
# default for region
if configuration.CONFIG_REGIONS not in result:
regions = self.validate_regions(None, action_name)
if len(regions) > 0:
result[configuration.CONFIG_REGIONS] = self.validate_regions(None, action_name)
# set internal flag if the task action is internal
if configuration.CONFIG_INTERNAL not in result:
if actions.get_action_properties(action_name).get(actions.ACTION_INTERNAL, False):
result[configuration.CONFIG_INTERNAL] = True
result[configuration.CONFIG_PARAMETERS] = self.verify_task_parameters(
attributes.get(configuration.CONFIG_PARAMETERS, {}), task_settings=result, action_name=action_name)
return result