in detection_rules/main.py [0:0]
def search_rules(query, columns, language, count, verbose=True, rules: Dict[str, TOMLRule] = None, pager=False):
"""Use KQL or EQL to find matching rules."""
from kql import get_evaluator
from eql.table import Table
from eql.build import get_engine
from eql import parse_query
from eql.pipes import CountPipe
from .rule import get_unique_query_fields
flattened_rules = []
rules = rules or {str(rule.path): rule for rule in RuleCollection.default()}
for file_name, rule in rules.items():
flat: dict = {"file": os.path.relpath(file_name)}
flat.update(rule.contents.to_dict())
flat.update(flat["metadata"])
flat.update(flat["rule"])
tactic_names = []
technique_ids = []
subtechnique_ids = []
for entry in flat['rule'].get('threat', []):
if entry["framework"] != "MITRE ATT&CK":
continue
techniques = entry.get('technique', [])
tactic_names.append(entry['tactic']['name'])
technique_ids.extend([t['id'] for t in techniques])
subtechnique_ids.extend([st['id'] for t in techniques for st in t.get('subtechnique', [])])
flat.update(techniques=technique_ids, tactics=tactic_names, subtechniques=subtechnique_ids,
unique_fields=get_unique_query_fields(rule))
flattened_rules.append(flat)
flattened_rules.sort(key=lambda dct: dct["name"])
filtered = []
if language == "kql":
evaluator = get_evaluator(query) if query else lambda x: True
filtered = list(filter(evaluator, flattened_rules))
elif language == "eql":
parsed = parse_query(query, implied_any=True, implied_base=True)
evaluator = get_engine(parsed)
filtered = [result.events[0].data for result in evaluator(flattened_rules)]
if not columns and any(isinstance(pipe, CountPipe) for pipe in parsed.pipes):
columns = ["key", "count", "percent"]
if count:
click.echo(f'{len(filtered)} rules')
return filtered
if columns:
columns = ",".join(columns).split(",")
else:
columns = ["rule_id", "file", "name"]
table = Table.from_list(columns, filtered)
if verbose:
click.echo_via_pager(table) if pager else click.echo(table)
return filtered