def search_rule_prs()

in detection_rules/devtools.py [0:0]


def search_rule_prs(ctx, no_loop, query, columns, language, token, threads):
    """Use KQL or EQL to find matching rules from active GitHub PRs."""
    from uuid import uuid4

    from .main import search_rules

    all_rules: Dict[Path, TOMLRule] = {}
    new, modified, errors = rule_loader.load_github_pr_rules(token=token, threads=threads)

    def add_github_meta(this_rule: TOMLRule, status: str, original_rule_id: Optional[definitions.UUIDString] = None):
        pr = this_rule.gh_pr
        data = rule.contents.data
        extend_meta = {
            'status': status,
            'github': {
                'base': pr.base.label,
                'comments': [c.body for c in pr.get_comments()],
                'commits': pr.commits,
                'created_at': str(pr.created_at),
                'head': pr.head.label,
                'is_draft': pr.draft,
                'labels': [lbl.name for lbl in pr.get_labels()],
                'last_modified': str(pr.last_modified),
                'title': pr.title,
                'url': pr.html_url,
                'user': pr.user.login
            }
        }

        if original_rule_id:
            extend_meta['original_rule_id'] = original_rule_id
            data = dataclasses.replace(rule.contents.data, rule_id=str(uuid4()))

        rule_path = Path(f'pr-{pr.number}-{rule.path}')
        new_meta = dataclasses.replace(rule.contents.metadata, extended=extend_meta)
        contents = dataclasses.replace(rule.contents, metadata=new_meta, data=data)
        new_rule = TOMLRule(path=rule_path, contents=contents)

        all_rules[new_rule.path] = new_rule

    for rule_id, rule in new.items():
        add_github_meta(rule, 'new')

    for rule_id, rules in modified.items():
        for rule in rules:
            add_github_meta(rule, 'modified', rule_id)

    loop = not no_loop
    ctx.invoke(search_rules, query=query, columns=columns, language=language, rules=all_rules, pager=loop)

    while loop:
        query = click.prompt(f'Search loop - enter new {language} query or ctrl-z to exit')
        columns = click.prompt('columns', default=','.join(columns)).split(',')
        ctx.invoke(search_rules, query=query, columns=columns, language=language, rules=all_rules, pager=True)