def kibana_diff()

in detection_rules/devtools.py [0:0]


def kibana_diff(rule_id, repo, branch, threads):
    """Diff rules against their version represented in kibana if exists."""
    from .misc import get_kibana_rules

    rules = RuleCollection.default()

    if rule_id:
        rules = rules.filter(lambda r: r.id in rule_id).id_map
    else:
        rules = rules.filter(production_filter).id_map

    repo_hashes = {r.id: r.contents.get_hash(include_version=True) for r in rules.values()}

    kibana_rules = {r['rule_id']: r for r in get_kibana_rules(repo=repo, branch=branch, threads=threads).values()}
    kibana_hashes = {r['rule_id']: dict_hash(r) for r in kibana_rules.values()}

    missing_from_repo = list(set(kibana_hashes).difference(set(repo_hashes)))
    missing_from_kibana = list(set(repo_hashes).difference(set(kibana_hashes)))

    rule_diff = []
    for rule_id, rule_hash in repo_hashes.items():
        if rule_id in missing_from_kibana:
            continue
        if rule_hash != kibana_hashes[rule_id]:
            rule_diff.append(
                f'versions - repo: {rules[rule_id].contents.autobumped_version}, '
                f'kibana: {kibana_rules[rule_id]["version"]} -> '
                f'{rule_id} - {rules[rule_id].contents.name}'
            )

    diff = {
        'missing_from_kibana': [f'{r} - {rules[r].name}' for r in missing_from_kibana],
        'diff': rule_diff,
        'missing_from_repo': [f'{r} - {kibana_rules[r]["name"]}' for r in missing_from_repo]
    }

    diff['stats'] = {k: len(v) for k, v in diff.items()}
    diff['stats'].update(total_repo_prod_rules=len(rules), total_gh_prod_rules=len(kibana_rules))

    click.echo(json.dumps(diff, indent=2, sort_keys=True))
    return diff