in detection_rules/main.py [0:0]
def import_rules_into_repo(input_file: click.Path, required_only: bool, action_connector_import: bool,
exceptions_import: bool, directory: click.Path, save_directory: click.Path,
action_connectors_directory: click.Path, exceptions_directory: click.Path,
skip_errors: bool, default_author: str, strip_none_values: bool):
"""Import rules from json, toml, or yaml files containing Kibana exported rule(s)."""
errors = []
rule_files = glob.glob(os.path.join(directory, "**", "*.*"), recursive=True) if directory else []
rule_files = sorted(set(rule_files + list(input_file)))
file_contents = []
for rule_file in rule_files:
file_contents.extend(load_rule_contents(Path(rule_file)))
if not file_contents:
click.echo("Must specify at least one file!")
exceptions_containers = {}
exceptions_items = {}
exceptions_containers, exceptions_items, _, unparsed_results = parse_exceptions_results_from_api(file_contents)
action_connectors, unparsed_results = parse_action_connector_results_from_api(unparsed_results)
file_contents = unparsed_results
exception_list_rule_table = {}
action_connector_rule_table = {}
rule_count = 0
for contents in file_contents:
# Don't load exceptions as rules
if contents.get("type") not in get_args(definitions.RuleType):
click.echo(f"Skipping - {contents.get("type")} is not a supported rule type")
continue
base_path = contents.get("name") or contents.get("rule", {}).get("name")
base_path = rulename_to_filename(base_path) if base_path else base_path
if base_path is None:
raise ValueError(f"Invalid rule file, please ensure the rule has a name field: {contents}")
rule_path = os.path.join(save_directory if save_directory is not None else RULES_DIRS[0], base_path)
# handle both rule json formats loaded from kibana and toml
data_view_id = contents.get("data_view_id") or contents.get("rule", {}).get("data_view_id")
additional = ["index"] if not data_view_id else ["data_view_id"]
# Use additional to store all available fields for the rule
additional += [key for key in contents if key not in additional and contents.get(key, None)]
# use default author if not provided
contents["author"] = contents.get("author") or default_author or [contents.get("created_by")]
if isinstance(contents["author"], str):
contents["author"] = [contents["author"]]
output = rule_prompt(
rule_path,
required_only=required_only,
save=True,
verbose=True,
additional_required=additional,
skip_errors=skip_errors,
strip_none_values=strip_none_values,
**contents,
)
# If output is not a TOMLRule
if isinstance(output, str):
errors.append(output)
else:
rule_count += 1
if contents.get("exceptions_list"):
# For each item in rule.contents.data.exceptions_list to the exception_list_rule_table under the list_id
for exception in contents["exceptions_list"]:
exception_id = exception["list_id"]
if exception_id not in exception_list_rule_table:
exception_list_rule_table[exception_id] = []
exception_list_rule_table[exception_id].append({"id": contents["id"], "name": contents["name"]})
if contents.get("actions"):
# If rule has actions with connectors, add them to the action_connector_rule_table under the action_id
for action in contents["actions"]:
action_id = action["id"]
if action_id not in action_connector_rule_table:
action_connector_rule_table[action_id] = []
action_connector_rule_table[action_id].append({"id": contents["id"], "name": contents["name"]})
# Build TOMLException Objects
if exceptions_import:
_, e_output, e_errors = build_exception_objects(
exceptions_containers,
exceptions_items,
exception_list_rule_table,
exceptions_directory,
save_toml=True,
skip_errors=skip_errors,
verbose=True,
)
for line in e_output:
click.echo(line)
errors.extend(e_errors)
# Build TOMLActionConnector Objects
if action_connector_import:
_, ac_output, ac_errors = build_action_connector_objects(
action_connectors,
action_connector_rule_table,
action_connectors_directory,
save_toml=True,
skip_errors=skip_errors,
verbose=True,
)
for line in ac_output:
click.echo(line)
errors.extend(ac_errors)
exceptions_count = 0 if not exceptions_import else len(exceptions_containers) + len(exceptions_items)
click.echo(f"{rule_count + exceptions_count + len(action_connectors)} results exported")
click.echo(f"{rule_count} rules converted")
click.echo(f"{exceptions_count} exceptions exported")
click.echo(f"{len(action_connectors)} actions connectors exported")
if errors:
err_file = save_directory if save_directory is not None else RULES_DIRS[0] / "_errors.txt"
err_file.write_text("\n".join(errors))
click.echo(f"{len(errors)} errors saved to {err_file}")