in detection_rules/eswrap.py [0:0]
def search_from_rule(self, rules: RuleCollection, start_time=None, end_time='now', size=None):
"""Search an elasticsearch instance using a rule."""
async_client = AsyncSearchClient(self.client)
survey_results = {}
multi_search = []
multi_search_rules = []
async_searches = []
eql_searches = []
for rule in rules:
if not rule.contents.data.get('query'):
continue
language = rule.contents.data.get('language')
query = rule.contents.data.query
rule_type = rule.contents.data.type
index_str, formatted_dsl, lucene_query = self._prep_query(query=query,
language=language,
index=rule.contents.data.get('index', '*'),
start_time=start_time,
end_time=end_time)
formatted_dsl.update(size=size or self.max_events)
# prep for searches: msearch for kql | async search for lucene | eql client search for eql
if language == 'kuery':
multi_search_rules.append(rule)
multi_search.append({'index': index_str, 'allow_no_indices': 'true', 'ignore_unavailable': 'true'})
multi_search.append(formatted_dsl)
elif language == 'lucene':
# wait for 0 to try and force async with no immediate results (not guaranteed)
result = async_client.submit(body=formatted_dsl, q=query, index=index_str,
allow_no_indices=True, ignore_unavailable=True,
wait_for_completion_timeout=0)
if result['is_running'] is True:
async_searches.append((rule, result['id']))
else:
survey_results[rule.id] = parse_unique_field_results(rule_type, ['process.name'],
result['response'])
elif language == 'eql':
eql_body = {
'index': index_str,
'params': {'ignore_unavailable': 'true', 'allow_no_indices': 'true'},
'body': {'query': query, 'filter': formatted_dsl['filter']}
}
eql_searches.append((rule, eql_body))
# assemble search results
multi_search_results = self.client.msearch(searches=multi_search)
for index, result in enumerate(multi_search_results['responses']):
try:
rule = multi_search_rules[index]
survey_results[rule.id] = parse_unique_field_results(rule.contents.data.type,
rule.contents.data.unique_fields, result)
except KeyError:
survey_results[multi_search_rules[index].id] = {'error_retrieving_results': True}
for entry in eql_searches:
rule: TOMLRule
search_args: dict
rule, search_args = entry
try:
result = self.client.eql.search(**search_args)
survey_results[rule.id] = parse_unique_field_results(rule.contents.data.type,
rule.contents.data.unique_fields, result)
except (elasticsearch.NotFoundError, elasticsearch.RequestError) as e:
survey_results[rule.id] = {'error_retrieving_results': True, 'error': e.info['error']['reason']}
for entry in async_searches:
rule: TOMLRule
rule, async_id = entry
result = async_client.get(id=async_id)['response']
survey_results[rule.id] = parse_unique_field_results(rule.contents.data.type, ['process.name'], result)
return survey_results