def search_from_rule()

in detection_rules/eswrap.py [0:0]


    def search_from_rule(self, rules: RuleCollection, start_time=None, end_time='now', size=None):
        """Search an elasticsearch instance using a rule."""
        async_client = AsyncSearchClient(self.client)
        survey_results = {}
        multi_search = []
        multi_search_rules = []
        async_searches = []
        eql_searches = []

        for rule in rules:
            if not rule.contents.data.get('query'):
                continue

            language = rule.contents.data.get('language')
            query = rule.contents.data.query
            rule_type = rule.contents.data.type
            index_str, formatted_dsl, lucene_query = self._prep_query(query=query,
                                                                      language=language,
                                                                      index=rule.contents.data.get('index', '*'),
                                                                      start_time=start_time,
                                                                      end_time=end_time)
            formatted_dsl.update(size=size or self.max_events)

            # prep for searches: msearch for kql | async search for lucene | eql client search for eql
            if language == 'kuery':
                multi_search_rules.append(rule)
                multi_search.append({'index': index_str, 'allow_no_indices': 'true', 'ignore_unavailable': 'true'})
                multi_search.append(formatted_dsl)
            elif language == 'lucene':
                # wait for 0 to try and force async with no immediate results (not guaranteed)
                result = async_client.submit(body=formatted_dsl, q=query, index=index_str,
                                             allow_no_indices=True, ignore_unavailable=True,
                                             wait_for_completion_timeout=0)
                if result['is_running'] is True:
                    async_searches.append((rule, result['id']))
                else:
                    survey_results[rule.id] = parse_unique_field_results(rule_type, ['process.name'],
                                                                         result['response'])
            elif language == 'eql':
                eql_body = {
                    'index': index_str,
                    'params': {'ignore_unavailable': 'true', 'allow_no_indices': 'true'},
                    'body': {'query': query, 'filter': formatted_dsl['filter']}
                }
                eql_searches.append((rule, eql_body))

        # assemble search results
        multi_search_results = self.client.msearch(searches=multi_search)
        for index, result in enumerate(multi_search_results['responses']):
            try:
                rule = multi_search_rules[index]
                survey_results[rule.id] = parse_unique_field_results(rule.contents.data.type,
                                                                     rule.contents.data.unique_fields, result)
            except KeyError:
                survey_results[multi_search_rules[index].id] = {'error_retrieving_results': True}

        for entry in eql_searches:
            rule: TOMLRule
            search_args: dict
            rule, search_args = entry
            try:
                result = self.client.eql.search(**search_args)
                survey_results[rule.id] = parse_unique_field_results(rule.contents.data.type,
                                                                     rule.contents.data.unique_fields, result)
            except (elasticsearch.NotFoundError, elasticsearch.RequestError) as e:
                survey_results[rule.id] = {'error_retrieving_results': True, 'error': e.info['error']['reason']}

        for entry in async_searches:
            rule: TOMLRule
            rule, async_id = entry
            result = async_client.get(id=async_id)['response']
            survey_results[rule.id] = parse_unique_field_results(rule.contents.data.type, ['process.name'], result)

        return survey_results