in detection_rules/rule_loader.py [0:0]
def load_github_pr_rules(labels: list = None, repo: str = 'elastic/detection-rules', token=None, threads=50,
verbose=True) -> (Dict[str, TOMLRule], Dict[str, TOMLRule], Dict[str, list]):
"""Load all rules active as a GitHub PR."""
from multiprocessing.pool import ThreadPool
from pathlib import Path
import pytoml
import requests
from .ghwrap import GithubClient
github = GithubClient(token=token)
repo = github.client.get_repo(repo)
labels = set(labels or [])
open_prs = [r for r in repo.get_pulls() if not labels.difference(set(list(lbl.name for lbl in r.get_labels())))]
new_rules: List[TOMLRule] = []
modified_rules: List[TOMLRule] = []
errors: Dict[str, list] = {}
existing_rules = RuleCollection.default()
pr_rules = []
if verbose:
click.echo('Downloading rules from GitHub PRs')
def download_worker(pr_info):
pull, rule_file = pr_info
response = requests.get(rule_file.raw_url)
try:
raw_rule = pytoml.loads(response.text)
contents = TOMLRuleContents.from_dict(raw_rule)
rule = TOMLRule(path=rule_file.filename, contents=contents)
rule.gh_pr = pull
if rule in existing_rules:
modified_rules.append(rule)
else:
new_rules.append(rule)
except Exception as e:
errors.setdefault(Path(rule_file.filename).name, []).append(str(e))
for pr in open_prs:
pr_rules.extend([(pr, f) for f in pr.get_files()
if f.filename.startswith('rules/') and f.filename.endswith('.toml')])
pool = ThreadPool(processes=threads)
pool.map(download_worker, pr_rules)
pool.close()
pool.join()
new = OrderedDict([(rule.contents.id, rule) for rule in sorted(new_rules, key=lambda r: r.contents.name)])
modified = OrderedDict()
for modified_rule in sorted(modified_rules, key=lambda r: r.contents.name):
modified.setdefault(modified_rule.contents.id, []).append(modified_rule)
return new, modified, errors