in source/code/handlers/select_resources_handler.py [0:0]
def handle_request(self):
"""
Handles the select resources request. Creates new actions for resources found for a task
:return: Results of handling the request
"""
def filter_by_action_filter(srv, used_role, r):
filter_method = getattr(self.action_class, actions.SELECT_AND_PROCESS_RESOURCE_METHOD, None)
if filter_method is not None:
r = filter_method(srv, self._logger, self._resource_name, r, self._context,
self.task, used_role)
if r is None:
self._logger.debug(DEBUG_FILTER_METHOD, self.action_class.__name__, actions.SELECT_AND_PROCESS_RESOURCE_METHOD)
return None
else:
self._logger.debug(DEBUG_FILTERED_RESOURCE, self.action_class.__name__,
actions.SELECT_AND_PROCESS_RESOURCE_METHOD, safe_json(r, indent=3))
return r
def is_selected_resource(aws_service, resource, used_role, taskname, tags_filter, does_resource_supports_tags):
# No tags then just use filter method if any
if not does_resource_supports_tags:
self._logger.debug(DEBUG_RESOURCE_NO_TAGS, resource)
return filter_by_action_filter(srv=aws_service,
used_role=used_role,
r=resource)
tags = resource.get("Tags", {})
# name of the tag that holds the list of tasks for this resource
tagname = self._task_tag
if tags_filter is None:
# test if name of the task is in list of tasks in tag value
if (tagname not in tags) or (taskname not in tagging.split_task_list(tags[tagname])):
self._logger.debug(DEBUG_RESOURCE_NOT_SELECTED, safe_json(resource, indent=2), taskname,
','.join(["'{}'='{}'".format(t, tags[t]) for t in tags]))
return None
self._logger.debug(DEBUG_SELECTED_BY_TASK_NAME_IN_TAG_VALUE, safe_json(resource, indent=2), tagname, taskname)
else:
# using a tag filter, * means any tag
if tags_filter != tagging.tag_filter_set.WILDCARD_CHAR:
# test if there are any tags matching the tag filter
if not TagFilterExpression(tags_filter).is_match(tags):
self._logger.debug(DEBUG_RESOURCE_NOT_SELECTED_TAG_FILTER, safe_json(resource, indent=2), taskname,
','.join(["'{}'='{}'".format(t, tags[t]) for t in tags]))
return None
self._logger.debug(DEBUG_SELECTED_BY_TAG_FILTER, safe_json(resource, indent=2), tags, tag_filter_str, taskname)
else:
self._logger.debug(DEBUG_SELECTED_WILDCARD_TAG_FILTER, safe_json(resource, indent=2), taskname)
return filter_by_action_filter(srv=aws_service,
used_role=used_role,
r=resource)
return filter_by_action_filter(srv=aws_service,
used_role=used_role,
r=resource)
def resource_batches(resources):
"""
Returns resources as chunks of size items. If the class has an optional custom aggregation function then the
resources are aggregated first using this function before applying the batch size
:param resources: resources to process
:return: Generator for blocks of resource items
"""
aggregate_func = getattr(self.action_class, actions.CUSTOM_AGGREGATE_METHOD, None)
for i in aggregate_func(resources, self.task_parameters, self._logger) if aggregate_func is not None else [resources]:
if self.batch_size is None:
yield i
else:
first = 0
while first < len(i):
yield i[first:first + self.batch_size]
first += self.batch_size
def setup_tag_filtering(t_name):
# get optional tag filter
no_select_by_tags = self.action_properties.get(actions.ACTION_NO_TAG_SELECT, False)
if no_select_by_tags:
tag_filter_string = tagging.tag_filter_set.WILDCARD_CHAR
else:
tag_filter_string = self.task.get(handlers.TASK_TAG_FILTER)
# set if only a single task is required for selecting the resources, it is used to optimise the select
select_tag = None
if tag_filter_string is None:
self._logger.debug(DEBUG_SELECT_BY_TASK_NAME, self._resource_name, self._task_tag, t_name)
select_tag = self._task_tag
elif tag_filter_string == tagging.tag_filter_set.WILDCARD_CHAR:
self._logger.debug(DEBUG_SELECT_ALL_RESOURCES, self._resource_name)
else:
self._logger.debug(DEBUG_TAG_FILTER_USED_TO_SELECT_RESOURCES, self._resource_name)
# build the tag expression that us used to filter the resources
tag_filter_expression = TagFilterExpression(tag_filter_string)
# the keys of the used tags
tag_filter_expression_tag_keys = list(tag_filter_expression.get_filter_keys())
# if there is only a single tag then we can optimize by just filtering on that specific tag
if len(tag_filter_expression_tag_keys) == 1 and \
tagging.tag_filter_set.WILDCARD_CHAR not in tag_filter_expression_tag_keys[0]:
select_tag = tag_filter_expression_tag_keys[0]
return select_tag, tag_filter_string
def add_aggregated(aggregated_resources):
# create tasks action for aggregated resources , optionally split in batch size chunks
for ra in resource_batches(aggregated_resources):
if self._check_can_execute(ra):
action_item = self.actions_tracking.add_task_action(task=self.task,
assumed_role=assumed_role,
action_resources=ra,
task_datetime=self.task_dt,
source=self.source,
task_group=self.task_group)
self._logger.debug(DEBUG_ADDED_AGGREGATED_RESOURCES_TASK, action_item[handlers.TASK_TR_ID], len(ra),
self._resource_name, self.task[handlers.TASK_NAME])
self._logger.debug("Added item\n{}", safe_json(action_item, indent=3))
yield action_item
def add_as_individual(resources):
for ri in resources:
# task action for each selected resource
if self._check_can_execute([ri]):
action_item = self.actions_tracking.add_task_action(task=self.task,
assumed_role=assumed_role,
action_resources=ri,
task_datetime=self.task_dt,
source=self.source,
task_group=self.task_group)
self._logger.debug(DEBUG_ADD_SINGLE_RESOURCE_TASK, action_item[handlers.TASK_TR_ID], self._resource_name,
self.task[handlers.TASK_NAME])
self._logger.debug("Added item\n{}", safe_json(action_item, indent=3))
yield action_item
try:
task_items = []
start = datetime.now()
self._logger.debug(DEBUG_EVENT, safe_json(self._event, indent=3))
self._logger.debug(DEBUG_ACTION, safe_json(self.action_properties, indent=3))
self._logger.info(INFO_SELECTED_RESOURCES, self._resource_name, self.service, self.task[handlers.TASK_NAME])
self._logger.info(INFO_AGGR_LEVEL, self.aggregation_level)
task_level_aggregated_resources = []
args = self._build_describe_argument()
service_resource_with_tags = services.create_service(self.service).resources_with_tags
if self._resource_name == "":
supports_tags = len(service_resource_with_tags) != 0
else:
supports_tags = self._resource_name.lower() in [r.lower() for r in service_resource_with_tags]
args["tags"] = supports_tags
self._logger.info(INFO_USE_TAGS_TO_SELECT, "R" if supports_tags else "No r")
task_name = self.task[handlers.TASK_NAME]
count_resource_items = 0
selected_resource_items = 0
select_on_tag, tag_filter_str = setup_tag_filtering(task_name)
filter_func = getattr(self.action_class, actions.FILTER_RESOURCE_METHOD, None)
# timer to guard selection time and log warning if getting close to lambda timeout
if self._context is not None:
self.start_timer(REMAINING_TIME_AFTER_DESCRIBE)
try:
for assumed_role in self._task_assumed_roles():
retry_strategy = get_default_retry_strategy(service=self.service, context=self._context)
service = services.create_service(service_name=self.service,
service_retry_strategy=retry_strategy, role_arn=assumed_role)
if self.is_timed_out():
break
# contains resources for account
account_level_aggregated_resources = []
self._logger.info(INFO_ACCOUNT, service.aws_account)
if assumed_role not in [None, ""]:
self._logger.info(INFO_ASSUMED_ROLE, assumed_role)
for region in self._regions:
# test for timeouts
if self.is_timed_out():
break
# handle region passed in the event
if region is not None:
args["region"] = region
else:
if "region" in args:
del args["region"]
# resources can be passed in the invent by event handlers
all_resources = self._event.get(handlers.HANDLER_SELECT_RESOURCES, None)
if all_resources is None:
# actions can have an optional method to select resources
action_custom_describe_function = getattr(self.action_class, "describe_resources", None)
if action_custom_describe_function is not None and self.use_custom_select:
all_resources = action_custom_describe_function(service, self.task, region)
else:
# select resources from the service
self._logger.debug(DEBUG_SELECT_PARAMETERS, self._resource_name, self.service, args)
# selecting a list of all resources in this account/region
all_resources = list(service.describe(self._resource_name,
filter_func=filter_func,
select_on_tag=select_on_tag,
**args))
# test for timeout
if self.is_timed_out():
break
count_resource_items += len(all_resources)
self._logger.info(INFO_RESOURCES_FOUND, len(all_resources))
# select resources that are processed by the task
selected_resources = []
for sr in all_resources:
sel = is_selected_resource(aws_service=service,
resource=sr,
used_role=assumed_role,
taskname=task_name,
tags_filter=tag_filter_str,
does_resource_supports_tags=supports_tags)
if sel is not None:
selected_resources.append(sel)
selected_resource_items += len(selected_resources)
# display found and selected resources
if len(all_resources) > 0:
self._logger.info(INFO_RESOURCES_SELECTED, len(selected_resources))
if len(selected_resources) == 0:
continue
# delete tags if not needed by the action
if not self.keep_tags:
for res in selected_resources:
if "Tags" in res:
del res["Tags"]
# add resources to total list of resources for this task
if self.aggregation_level == actions.ACTION_AGGREGATION_TASK:
task_level_aggregated_resources += selected_resources
# add resources to list of resources for this account
if self.aggregation_level == actions.ACTION_AGGREGATION_ACCOUNT:
account_level_aggregated_resources += selected_resources
# add batch(es) of resources for this region
if self.aggregation_level == actions.ACTION_AGGREGATION_REGION and len(selected_resources) > 0:
task_items += list(add_aggregated(selected_resources))
# no aggregation, add each individual resource
if self.aggregation_level == actions.ACTION_AGGREGATION_RESOURCE and len(selected_resources) > 0:
task_items += list(add_as_individual(selected_resources))
# at the end of the region loop, check if aggregated resources for account need to be added
if self.aggregation_level == actions.ACTION_AGGREGATION_ACCOUNT and len(account_level_aggregated_resources) > 0:
task_items += list(add_aggregated(account_level_aggregated_resources))
# at the end of the accounts loop, check if aggregated resources for task need to be added
if self.aggregation_level == actions.ACTION_AGGREGATION_TASK and len(task_level_aggregated_resources) > 0:
task_items += list(add_aggregated(task_level_aggregated_resources))
except Exception as ex:
raise_exception(ERR_SELECTING_TASK_RESOURCES, self.task[handlers.TASK_NAME], ex)
finally:
if self._timer is not None:
# cancel time used avoid timeouts when selecting resources
self._timer.cancel()
if self.is_timed_out():
raise_exception(ERR_TIMEOUT_SELECTING_RESOURCES, self._resource_name, self.service, task_name)
self.start_timer(REMAINING_TIME_AFTER_STORE)
self.actions_tracking.flush(self._timeout_event)
if self.is_timed_out():
raise_exception(ERR_CREATING_TASKS_FOR_SELECTED_RESOURCES, task_name)
self._timer.cancel()
else:
self.actions_tracking.flush()
self._logger.info(INFO_ADDED_ITEMS, len(task_items), self.task[handlers.TASK_NAME])
running_time = float((datetime.now() - start).total_seconds())
self._logger.info(INFO_RESULT, running_time)
if self.metrics:
put_task_select_data(task_name=task_name,
items=count_resource_items,
selected_items=selected_resource_items,
logger=self._logger,
selection_time=running_time)
return safe_dict({
"datetime": datetime.now().isoformat(),
"running-time": running_time,
"dispatched-tasks": task_items
})
finally:
self._logger.flush()