def kibana_export_rules()

in detection_rules/kbwrap.py [0:0]


def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_directory: Optional[Path],
                        exceptions_directory: Optional[Path], default_author: str,
                        rule_id: Optional[Iterable[str]] = None, rule_name: Optional[str] = None,
                        export_action_connectors: bool = False,
                        export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False,
                        no_tactic_filename: bool = False, local_creation_date: bool = False,
                        local_updated_date: bool = False) -> List[TOMLRule]:
    """Export custom rules from Kibana."""
    kibana = ctx.obj["kibana"]
    kibana_include_details = export_exceptions or export_action_connectors

    # Only allow one of rule_id or rule_name
    if rule_name and rule_id:
        raise click.UsageError("Cannot use --rule-id and --rule-name together. Please choose one.")

    with kibana:
        # Look up rule IDs by name if --rule-name was provided
        if rule_name:
            found = RuleResource.find(filter=f"alert.attributes.name:{rule_name}")
            rule_id = [r["rule_id"] for r in found]
        results = RuleResource.export_rules(list(rule_id), exclude_export_details=not kibana_include_details)

    # Handle Exceptions Directory Location
    if results and exceptions_directory:
        exceptions_directory.mkdir(parents=True, exist_ok=True)
    exceptions_directory = exceptions_directory or RULES_CONFIG.exception_dir
    if not exceptions_directory and export_exceptions:
        click.echo("Warning: Exceptions export requested, but no exceptions directory found")

    # Handle Actions Connector Directory Location
    if results and action_connectors_directory:
        action_connectors_directory.mkdir(parents=True, exist_ok=True)
    action_connectors_directory = action_connectors_directory or RULES_CONFIG.action_connector_dir
    if not action_connectors_directory and export_action_connectors:
        click.echo("Warning: Action Connector export requested, but no Action Connector directory found")

    if results:
        directory.mkdir(parents=True, exist_ok=True)
    else:
        click.echo("No rules found to export")
        return []

    rules_results = results
    action_connector_results = []
    exception_results = []
    if kibana_include_details:
        # Assign counts to variables
        rules_count = results[-1]["exported_rules_count"]
        exception_list_count = results[-1]["exported_exception_list_count"]
        exception_list_item_count = results[-1]["exported_exception_list_item_count"]
        action_connector_count = results[-1]["exported_action_connector_count"]

        # Parse rules results and exception results from API return
        rules_results = results[:rules_count]
        exception_results = results[rules_count:rules_count + exception_list_count + exception_list_item_count]
        rules_and_exceptions_count = rules_count + exception_list_count + exception_list_item_count
        action_connector_results = results[
            rules_and_exceptions_count: rules_and_exceptions_count + action_connector_count
        ]

    errors = []
    exported = []
    exception_list_rule_table = {}
    action_connector_rule_table = {}
    for rule_resource in rules_results:
        try:
            if strip_version:
                rule_resource.pop("revision", None)
                rule_resource.pop("version", None)
            rule_resource["author"] = rule_resource.get("author") or default_author or [rule_resource.get("created_by")]
            if isinstance(rule_resource["author"], str):
                rule_resource["author"] = [rule_resource["author"]]
            # Inherit maturity and optionally local dates from the rule if it already exists
            params = {
                "rule": rule_resource,
                "maturity": "development",
            }
            threat = rule_resource.get("threat")
            first_tactic = threat[0].get("tactic").get("name") if threat else ""
            # Check if flag or config is set to not include tactic in the filename
            no_tactic_filename = no_tactic_filename or RULES_CONFIG.no_tactic_filename
            # Check if the flag is set to not include tactic in the filename
            tactic_name = first_tactic if not no_tactic_filename else None
            rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=tactic_name)

            save_path = directory / f"{rule_name}"
            params.update(
                update_metadata_from_file(
                    save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date}
                )
            )
            contents = TOMLRuleContents.from_rule_resource(**params)
            rule = TOMLRule(contents=contents, path=save_path)
        except Exception as e:
            if skip_errors:
                print(f'- skipping {rule_resource.get("name")} - {type(e).__name__}')
                errors.append(f'- {rule_resource.get("name")} - {e}')
                continue
            raise
        if rule.contents.data.exceptions_list:
            # For each item in rule.contents.data.exceptions_list to the exception_list_rule_table under the list_id
            for exception in rule.contents.data.exceptions_list:
                exception_id = exception["list_id"]
                if exception_id not in exception_list_rule_table:
                    exception_list_rule_table[exception_id] = []
                exception_list_rule_table[exception_id].append({"id": rule.id, "name": rule.name})
        if rule.contents.data.actions:
            # use connector ids as rule source
            for action in rule.contents.data.actions:
                action_id = action["id"]
                if action_id not in action_connector_rule_table:
                    action_connector_rule_table[action_id] = []
                action_connector_rule_table[action_id].append({"id": rule.id, "name": rule.name})

        exported.append(rule)

    # Parse exceptions results from API return
    exceptions = []
    if export_exceptions:
        exceptions_containers = {}
        exceptions_items = {}

        exceptions_containers, exceptions_items, parse_errors, _ = parse_exceptions_results_from_api(exception_results)
        errors.extend(parse_errors)

        # Build TOMLException Objects
        exceptions, e_output, e_errors = build_exception_objects(
            exceptions_containers,
            exceptions_items,
            exception_list_rule_table,
            exceptions_directory if exceptions_directory else None,
            save_toml=False,
            skip_errors=skip_errors,
            verbose=False,
        )
        for line in e_output:
            click.echo(line)
        errors.extend(e_errors)

    # Parse action connector results from API return
    action_connectors = []
    if export_action_connectors:
        action_connector_results, _ = parse_action_connector_results_from_api(action_connector_results)

        # Build TOMLActionConnector Objects
        action_connectors, ac_output, ac_errors = build_action_connector_objects(
            action_connector_results,
            action_connector_rule_table,
            action_connectors_directory=action_connectors_directory if action_connectors_directory else None,
            save_toml=False,
            skip_errors=skip_errors,
            verbose=False,
        )
        for line in ac_output:
            click.echo(line)
        errors.extend(ac_errors)

    saved = []
    for rule in exported:
        try:
            rule.save_toml()
        except Exception as e:
            if skip_errors:
                print(f"- skipping {rule.contents.data.name} - {type(e).__name__}")
                errors.append(f"- {rule.contents.data.name} - {e}")
                continue
            raise

        saved.append(rule)

    saved_exceptions = []
    for exception in exceptions:
        try:
            exception.save_toml()
        except Exception as e:
            if skip_errors:
                print(f"- skipping {exception.rule_name} - {type(e).__name__}")
                errors.append(f"- {exception.rule_name} - {e}")
                continue
            raise

        saved_exceptions.append(exception)

    saved_action_connectors = []
    for action in action_connectors:
        try:
            action.save_toml()
        except Exception as e:
            if skip_errors:
                print(f"- skipping {action.name} - {type(e).__name__}")
                errors.append(f"- {action.name} - {e}")
                continue
            raise

        saved_action_connectors.append(action)

    click.echo(f"{len(results)} results exported")
    click.echo(f"{len(exported)} rules converted")
    click.echo(f"{len(exceptions)} exceptions exported")
    click.echo(f"{len(action_connectors)} action connectors exported")
    click.echo(f"{len(saved)} rules saved to {directory}")
    click.echo(f"{len(saved_exceptions)} exception lists saved to {exceptions_directory}")
    click.echo(f"{len(saved_action_connectors)} action connectors saved to {action_connectors_directory}")
    if errors:
        err_file = directory / "_errors.txt"
        err_file.write_text("\n".join(errors))
        click.echo(f"{len(errors)} errors saved to {err_file}")

    return exported