def handle_request()

in source/code/handlers/select_resources_handler.py [0:0]


    def handle_request(self):
        """
        Handles the select resources request. Creates new actions for resources found for a task
        :return: Results of handling the request
        """

        def filter_by_action_filter(srv, used_role, r):
            filter_method = getattr(self.action_class, actions.SELECT_AND_PROCESS_RESOURCE_METHOD, None)
            if filter_method is not None:
                r = filter_method(srv, self._logger, self._resource_name, r, self._context,
                                  self.task, used_role)
                if r is None:
                    self._logger.debug(DEBUG_FILTER_METHOD, self.action_class.__name__, actions.SELECT_AND_PROCESS_RESOURCE_METHOD)
                    return None
                else:
                    self._logger.debug(DEBUG_FILTERED_RESOURCE, self.action_class.__name__,
                                       actions.SELECT_AND_PROCESS_RESOURCE_METHOD, safe_json(r, indent=3))

            return r

        def is_selected_resource(aws_service, resource, used_role, taskname, tags_filter, does_resource_supports_tags):

            # No tags then just use filter method if any
            if not does_resource_supports_tags:
                self._logger.debug(DEBUG_RESOURCE_NO_TAGS, resource)
                return filter_by_action_filter(srv=aws_service,
                                               used_role=used_role,
                                               r=resource)

            tags = resource.get("Tags", {})

            # name of the tag that holds the list of tasks for this resource
            tagname = self._task_tag

            if tags_filter is None:
                # test if name of the task is in list of tasks in tag value
                if (tagname not in tags) or (taskname not in tagging.split_task_list(tags[tagname])):
                    self._logger.debug(DEBUG_RESOURCE_NOT_SELECTED, safe_json(resource, indent=2), taskname,
                                       ','.join(["'{}'='{}'".format(t, tags[t]) for t in tags]))
                    return None
                self._logger.debug(DEBUG_SELECTED_BY_TASK_NAME_IN_TAG_VALUE, safe_json(resource, indent=2), tagname, taskname)
            else:
                # using a tag filter, * means any tag
                if tags_filter != tagging.tag_filter_set.WILDCARD_CHAR:
                    # test if there are any tags matching the tag filter
                    if not TagFilterExpression(tags_filter).is_match(tags):
                        self._logger.debug(DEBUG_RESOURCE_NOT_SELECTED_TAG_FILTER, safe_json(resource, indent=2), taskname,
                                           ','.join(["'{}'='{}'".format(t, tags[t]) for t in tags]))
                        return None
                    self._logger.debug(DEBUG_SELECTED_BY_TAG_FILTER, safe_json(resource, indent=2), tags, tag_filter_str, taskname)
                else:
                    self._logger.debug(DEBUG_SELECTED_WILDCARD_TAG_FILTER, safe_json(resource, indent=2), taskname)
                    return filter_by_action_filter(srv=aws_service,
                                                   used_role=used_role,
                                                   r=resource)

            return filter_by_action_filter(srv=aws_service,
                                           used_role=used_role,
                                           r=resource)

        def resource_batches(resources):
            """
            Returns resources as chunks of size items. If the class has an optional custom aggregation function then the 
            resources are aggregated first using this function before applying the batch size
            :param resources: resources to process
            :return: Generator for blocks of resource items
            """

            aggregate_func = getattr(self.action_class, actions.CUSTOM_AGGREGATE_METHOD, None)

            for i in aggregate_func(resources, self.task_parameters, self._logger) if aggregate_func is not None else [resources]:
                if self.batch_size is None:
                    yield i
                else:
                    first = 0
                    while first < len(i):
                        yield i[first:first + self.batch_size]
                        first += self.batch_size

        def setup_tag_filtering(t_name):
            # get optional tag filter
            no_select_by_tags = self.action_properties.get(actions.ACTION_NO_TAG_SELECT, False)
            if no_select_by_tags:
                tag_filter_string = tagging.tag_filter_set.WILDCARD_CHAR
            else:
                tag_filter_string = self.task.get(handlers.TASK_TAG_FILTER)

            # set if only a single task is required for selecting the resources, it is used to optimise the select
            select_tag = None
            if tag_filter_string is None:
                self._logger.debug(DEBUG_SELECT_BY_TASK_NAME, self._resource_name, self._task_tag, t_name)
                select_tag = self._task_tag
            elif tag_filter_string == tagging.tag_filter_set.WILDCARD_CHAR:
                self._logger.debug(DEBUG_SELECT_ALL_RESOURCES, self._resource_name)
            else:
                self._logger.debug(DEBUG_TAG_FILTER_USED_TO_SELECT_RESOURCES, self._resource_name)
                # build the tag expression that us used to filter the resources
                tag_filter_expression = TagFilterExpression(tag_filter_string)
                # the keys of the used tags
                tag_filter_expression_tag_keys = list(tag_filter_expression.get_filter_keys())
                # if there is only a single tag then we can optimize by just filtering on that specific tag
                if len(tag_filter_expression_tag_keys) == 1 and \
                        tagging.tag_filter_set.WILDCARD_CHAR not in tag_filter_expression_tag_keys[0]:
                    select_tag = tag_filter_expression_tag_keys[0]
            return select_tag, tag_filter_string

        def add_aggregated(aggregated_resources):
            # create tasks action for aggregated resources , optionally split in batch size chunks
            for ra in resource_batches(aggregated_resources):
                if self._check_can_execute(ra):
                    action_item = self.actions_tracking.add_task_action(task=self.task,
                                                                        assumed_role=assumed_role,
                                                                        action_resources=ra,
                                                                        task_datetime=self.task_dt,
                                                                        source=self.source,
                                                                        task_group=self.task_group)

                    self._logger.debug(DEBUG_ADDED_AGGREGATED_RESOURCES_TASK, action_item[handlers.TASK_TR_ID], len(ra),
                                       self._resource_name, self.task[handlers.TASK_NAME])

                    self._logger.debug("Added item\n{}", safe_json(action_item, indent=3))

                    yield action_item

        def add_as_individual(resources):
            for ri in resources:
                # task action for each selected resource
                if self._check_can_execute([ri]):
                    action_item = self.actions_tracking.add_task_action(task=self.task,
                                                                        assumed_role=assumed_role,
                                                                        action_resources=ri,
                                                                        task_datetime=self.task_dt,
                                                                        source=self.source,
                                                                        task_group=self.task_group)

                    self._logger.debug(DEBUG_ADD_SINGLE_RESOURCE_TASK, action_item[handlers.TASK_TR_ID], self._resource_name,
                                       self.task[handlers.TASK_NAME])
                    self._logger.debug("Added item\n{}", safe_json(action_item, indent=3))
                    yield action_item

        try:
            task_items = []
            start = datetime.now()

            self._logger.debug(DEBUG_EVENT, safe_json(self._event, indent=3))
            self._logger.debug(DEBUG_ACTION, safe_json(self.action_properties, indent=3))

            self._logger.info(INFO_SELECTED_RESOURCES, self._resource_name, self.service, self.task[handlers.TASK_NAME])
            self._logger.info(INFO_AGGR_LEVEL, self.aggregation_level)

            task_level_aggregated_resources = []

            args = self._build_describe_argument()

            service_resource_with_tags = services.create_service(self.service).resources_with_tags
            if self._resource_name == "":
                supports_tags = len(service_resource_with_tags) != 0
            else:
                supports_tags = self._resource_name.lower() in [r.lower() for r in service_resource_with_tags]

            args["tags"] = supports_tags
            self._logger.info(INFO_USE_TAGS_TO_SELECT, "R" if supports_tags else "No r")

            task_name = self.task[handlers.TASK_NAME]
            count_resource_items = 0
            selected_resource_items = 0

            select_on_tag, tag_filter_str = setup_tag_filtering(task_name)

            filter_func = getattr(self.action_class, actions.FILTER_RESOURCE_METHOD, None)

            # timer to guard selection time and log warning if getting close to lambda timeout
            if self._context is not None:
                self.start_timer(REMAINING_TIME_AFTER_DESCRIBE)
            try:

                for assumed_role in self._task_assumed_roles():
                    retry_strategy = get_default_retry_strategy(service=self.service, context=self._context)

                    service = services.create_service(service_name=self.service,
                                                      service_retry_strategy=retry_strategy, role_arn=assumed_role)

                    if self.is_timed_out():
                        break

                    # contains resources for account
                    account_level_aggregated_resources = []

                    self._logger.info(INFO_ACCOUNT, service.aws_account)
                    if assumed_role not in [None, ""]:
                        self._logger.info(INFO_ASSUMED_ROLE, assumed_role)

                    for region in self._regions:

                        # test for timeouts
                        if self.is_timed_out():
                            break

                        # handle region passed in the event
                        if region is not None:
                            args["region"] = region
                        else:
                            if "region" in args:
                                del args["region"]

                        # resources can be passed in the invent by event handlers
                        all_resources = self._event.get(handlers.HANDLER_SELECT_RESOURCES, None)

                        if all_resources is None:

                            # actions can have an optional method to select resources
                            action_custom_describe_function = getattr(self.action_class, "describe_resources", None)
                            if action_custom_describe_function is not None and self.use_custom_select:
                                all_resources = action_custom_describe_function(service, self.task, region)
                            else:
                                # select resources from the service
                                self._logger.debug(DEBUG_SELECT_PARAMETERS, self._resource_name, self.service, args)
                                # selecting a list of all resources in this account/region
                                all_resources = list(service.describe(self._resource_name,
                                                                      filter_func=filter_func,
                                                                      select_on_tag=select_on_tag,
                                                                      **args))
                            # test for timeout
                            if self.is_timed_out():
                                break

                            count_resource_items += len(all_resources)

                        self._logger.info(INFO_RESOURCES_FOUND, len(all_resources))

                        # select resources that are processed by the task
                        selected_resources = []
                        for sr in all_resources:
                            sel = is_selected_resource(aws_service=service,
                                                       resource=sr,
                                                       used_role=assumed_role,
                                                       taskname=task_name,
                                                       tags_filter=tag_filter_str,
                                                       does_resource_supports_tags=supports_tags)
                            if sel is not None:
                                selected_resources.append(sel)

                        selected_resource_items += len(selected_resources)

                        # display found and selected resources
                        if len(all_resources) > 0:
                            self._logger.info(INFO_RESOURCES_SELECTED, len(selected_resources))
                            if len(selected_resources) == 0:
                                continue

                        # delete tags if not needed by the action
                        if not self.keep_tags:
                            for res in selected_resources:
                                if "Tags" in res:
                                    del res["Tags"]

                        # add resources to total list of resources for this task
                        if self.aggregation_level == actions.ACTION_AGGREGATION_TASK:
                            task_level_aggregated_resources += selected_resources

                        # add resources to list of resources for this account
                        if self.aggregation_level == actions.ACTION_AGGREGATION_ACCOUNT:
                            account_level_aggregated_resources += selected_resources

                        # add batch(es) of resources for this region
                        if self.aggregation_level == actions.ACTION_AGGREGATION_REGION and len(selected_resources) > 0:
                            task_items += list(add_aggregated(selected_resources))

                        # no aggregation, add each individual resource
                        if self.aggregation_level == actions.ACTION_AGGREGATION_RESOURCE and len(selected_resources) > 0:
                            task_items += list(add_as_individual(selected_resources))

                    # at the end of the region loop, check if aggregated resources for account need to be added
                    if self.aggregation_level == actions.ACTION_AGGREGATION_ACCOUNT and len(account_level_aggregated_resources) > 0:
                        task_items += list(add_aggregated(account_level_aggregated_resources))

                # at the end of the accounts loop, check if aggregated resources for task need to be added
                if self.aggregation_level == actions.ACTION_AGGREGATION_TASK and len(task_level_aggregated_resources) > 0:
                    task_items += list(add_aggregated(task_level_aggregated_resources))
            except Exception as ex:
                raise_exception(ERR_SELECTING_TASK_RESOURCES, self.task[handlers.TASK_NAME], ex)

            finally:
                if self._timer is not None:
                    # cancel time used avoid timeouts when selecting resources
                    self._timer.cancel()
                    if self.is_timed_out():
                        raise_exception(ERR_TIMEOUT_SELECTING_RESOURCES, self._resource_name, self.service, task_name)

                    self.start_timer(REMAINING_TIME_AFTER_STORE)

                    self.actions_tracking.flush(self._timeout_event)
                    if self.is_timed_out():
                        raise_exception(ERR_CREATING_TASKS_FOR_SELECTED_RESOURCES, task_name)
                    self._timer.cancel()
                else:
                    self.actions_tracking.flush()

            self._logger.info(INFO_ADDED_ITEMS, len(task_items), self.task[handlers.TASK_NAME])

            running_time = float((datetime.now() - start).total_seconds())
            self._logger.info(INFO_RESULT, running_time)

            if self.metrics:
                put_task_select_data(task_name=task_name,
                                     items=count_resource_items,
                                     selected_items=selected_resource_items,
                                     logger=self._logger,
                                     selection_time=running_time)

            return safe_dict({
                "datetime": datetime.now().isoformat(),
                "running-time": running_time,
                "dispatched-tasks": task_items
            })

        finally:
            self._logger.flush()