def import_rules_into_repo()

in detection_rules/main.py [0:0]


def import_rules_into_repo(input_file: click.Path, required_only: bool, action_connector_import: bool,
                           exceptions_import: bool, directory: click.Path, save_directory: click.Path,
                           action_connectors_directory: click.Path, exceptions_directory: click.Path,
                           skip_errors: bool, default_author: str, strip_none_values: bool, local_creation_date: bool,
                           local_updated_date: bool):
    """Import rules from json, toml, or yaml files containing Kibana exported rule(s)."""
    errors = []
    rule_files = glob.glob(os.path.join(directory, "**", "*.*"), recursive=True) if directory else []
    rule_files = sorted(set(rule_files + list(input_file)))

    file_contents = []
    for rule_file in rule_files:
        file_contents.extend(load_rule_contents(Path(rule_file)))

    if not file_contents:
        click.echo("Must specify at least one file!")

    exceptions_containers = {}
    exceptions_items = {}

    exceptions_containers, exceptions_items, _, unparsed_results = parse_exceptions_results_from_api(file_contents)

    action_connectors, unparsed_results = parse_action_connector_results_from_api(unparsed_results)

    file_contents = unparsed_results

    exception_list_rule_table = {}
    action_connector_rule_table = {}
    rule_count = 0
    for contents in file_contents:
        # Don't load exceptions as rules
        if contents.get("type") not in get_args(definitions.RuleType):
            click.echo(f"Skipping - {contents.get("type")} is not a supported rule type")
            continue
        base_path = contents.get("name") or contents.get("rule", {}).get("name")
        base_path = rulename_to_filename(base_path) if base_path else base_path
        if base_path is None:
            raise ValueError(f"Invalid rule file, please ensure the rule has a name field: {contents}")
        rule_path = os.path.join(save_directory if save_directory is not None else RULES_DIRS[0], base_path)

        # handle both rule json formats loaded from kibana and toml
        data_view_id = contents.get("data_view_id") or contents.get("rule", {}).get("data_view_id")
        additional = ["index"] if not data_view_id else ["data_view_id"]

        # Use additional to store all available fields for the rule
        additional += [key for key in contents if key not in additional and contents.get(key, None)]

        # use default author if not provided
        contents["author"] = contents.get("author") or default_author or [contents.get("created_by")]
        if isinstance(contents["author"], str):
            contents["author"] = [contents["author"]]

        contents.update(
            update_metadata_from_file(
                Path(rule_path), {"creation_date": local_creation_date, "updated_date": local_updated_date}
            )
        )

        output = rule_prompt(
            rule_path,
            required_only=required_only,
            save=True,
            verbose=True,
            additional_required=additional,
            skip_errors=skip_errors,
            strip_none_values=strip_none_values,
            **contents,
        )
        # If output is not a TOMLRule
        if isinstance(output, str):
            errors.append(output)
        else:
            rule_count += 1

        if contents.get("exceptions_list"):
            # For each item in rule.contents.data.exceptions_list to the exception_list_rule_table under the list_id
            for exception in contents["exceptions_list"]:
                exception_id = exception["list_id"]
                if exception_id not in exception_list_rule_table:
                    exception_list_rule_table[exception_id] = []
                exception_list_rule_table[exception_id].append({"id": contents["id"], "name": contents["name"]})

        if contents.get("actions"):
            # If rule has actions with connectors, add them to the action_connector_rule_table under the action_id
            for action in contents["actions"]:
                action_id = action["id"]
                if action_id not in action_connector_rule_table:
                    action_connector_rule_table[action_id] = []
                action_connector_rule_table[action_id].append({"id": contents["id"], "name": contents["name"]})

    # Build TOMLException Objects
    if exceptions_import:
        _, e_output, e_errors = build_exception_objects(
            exceptions_containers,
            exceptions_items,
            exception_list_rule_table,
            exceptions_directory,
            save_toml=True,
            skip_errors=skip_errors,
            verbose=True,
        )
        for line in e_output:
            click.echo(line)
        errors.extend(e_errors)

    # Build TOMLActionConnector Objects
    if action_connector_import:
        _, ac_output, ac_errors = build_action_connector_objects(
            action_connectors,
            action_connector_rule_table,
            action_connectors_directory,
            save_toml=True,
            skip_errors=skip_errors,
            verbose=True,
        )
        for line in ac_output:
            click.echo(line)
        errors.extend(ac_errors)

    exceptions_count = 0 if not exceptions_import else len(exceptions_containers) + len(exceptions_items)
    click.echo(f"{rule_count + exceptions_count + len(action_connectors)} results exported")
    click.echo(f"{rule_count} rules converted")
    click.echo(f"{exceptions_count} exceptions exported")
    click.echo(f"{len(action_connectors)} actions connectors exported")
    if errors:
        err_file = save_directory if save_directory is not None else RULES_DIRS[0] / "_errors.txt"
        err_file.write_text("\n".join(errors))
        click.echo(f"{len(errors)} errors saved to {err_file}")