in detection_rules/kbwrap.py [0:0]
def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_directory: Optional[Path],
exceptions_directory: Optional[Path], default_author: str,
rule_id: Optional[Iterable[str]] = None, export_action_connectors: bool = False,
export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False
) -> List[TOMLRule]:
"""Export custom rules from Kibana."""
kibana = ctx.obj["kibana"]
kibana_include_details = export_exceptions or export_action_connectors
with kibana:
results = RuleResource.export_rules(list(rule_id), exclude_export_details=not kibana_include_details)
# Handle Exceptions Directory Location
if results and exceptions_directory:
exceptions_directory.mkdir(parents=True, exist_ok=True)
exceptions_directory = exceptions_directory or RULES_CONFIG.exception_dir
if not exceptions_directory and export_exceptions:
click.echo("Warning: Exceptions export requested, but no exceptions directory found")
# Handle Actions Connector Directory Location
if results and action_connectors_directory:
action_connectors_directory.mkdir(parents=True, exist_ok=True)
action_connectors_directory = action_connectors_directory or RULES_CONFIG.action_connector_dir
if not action_connectors_directory and export_action_connectors:
click.echo("Warning: Action Connector export requested, but no Action Connector directory found")
if results:
directory.mkdir(parents=True, exist_ok=True)
else:
click.echo("No rules found to export")
return []
rules_results = results
if kibana_include_details:
# Assign counts to variables
rules_count = results[-1]["exported_rules_count"]
exception_list_count = results[-1]["exported_exception_list_count"]
exception_list_item_count = results[-1]["exported_exception_list_item_count"]
action_connector_count = results[-1]["exported_action_connector_count"]
# Parse rules results and exception results from API return
rules_results = results[:rules_count]
exception_results = results[rules_count:rules_count + exception_list_count + exception_list_item_count]
rules_and_exceptions_count = rules_count + exception_list_count + exception_list_item_count
action_connector_results = results[
rules_and_exceptions_count: rules_and_exceptions_count + action_connector_count
]
errors = []
exported = []
exception_list_rule_table = {}
action_connector_rule_table = {}
for rule_resource in rules_results:
try:
if strip_version:
rule_resource.pop("revision", None)
rule_resource.pop("version", None)
rule_resource["author"] = rule_resource.get("author") or default_author or [rule_resource.get("created_by")]
if isinstance(rule_resource["author"], str):
rule_resource["author"] = [rule_resource["author"]]
# Inherit maturity from the rule already exists
maturity = "development"
threat = rule_resource.get("threat")
first_tactic = threat[0].get("tactic").get("name") if threat else ""
rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=first_tactic)
# check if directory / f"{rule_name}" exists
if (directory / f"{rule_name}").exists():
rules = RuleCollection()
rules.load_file(directory / f"{rule_name}")
if rules:
maturity = rules.rules[0].contents.metadata.maturity
contents = TOMLRuleContents.from_rule_resource(
rule_resource, maturity=maturity
)
rule = TOMLRule(contents=contents, path=directory / f"{rule_name}")
except Exception as e:
if skip_errors:
print(f'- skipping {rule_resource.get("name")} - {type(e).__name__}')
errors.append(f'- {rule_resource.get("name")} - {e}')
continue
raise
if rule.contents.data.exceptions_list:
# For each item in rule.contents.data.exceptions_list to the exception_list_rule_table under the list_id
for exception in rule.contents.data.exceptions_list:
exception_id = exception["list_id"]
if exception_id not in exception_list_rule_table:
exception_list_rule_table[exception_id] = []
exception_list_rule_table[exception_id].append({"id": rule.id, "name": rule.name})
if rule.contents.data.actions:
# use connector ids as rule source
for action in rule.contents.data.actions:
action_id = action["id"]
if action_id not in action_connector_rule_table:
action_connector_rule_table[action_id] = []
action_connector_rule_table[action_id].append({"id": rule.id, "name": rule.name})
exported.append(rule)
# Parse exceptions results from API return
exceptions = []
if export_exceptions:
exceptions_containers = {}
exceptions_items = {}
exceptions_containers, exceptions_items, parse_errors, _ = parse_exceptions_results_from_api(exception_results)
errors.extend(parse_errors)
# Build TOMLException Objects
exceptions, e_output, e_errors = build_exception_objects(
exceptions_containers,
exceptions_items,
exception_list_rule_table,
exceptions_directory if exceptions_directory else None,
save_toml=False,
skip_errors=skip_errors,
verbose=False,
)
for line in e_output:
click.echo(line)
errors.extend(e_errors)
# Parse action connector results from API return
action_connectors = []
if export_action_connectors:
action_connector_results, _ = parse_action_connector_results_from_api(action_connector_results)
# Build TOMLActionConnector Objects
action_connectors, ac_output, ac_errors = build_action_connector_objects(
action_connector_results,
action_connector_rule_table,
action_connectors_directory=action_connectors_directory if action_connectors_directory else None,
save_toml=False,
skip_errors=skip_errors,
verbose=False,
)
for line in ac_output:
click.echo(line)
errors.extend(ac_errors)
saved = []
for rule in exported:
try:
rule.save_toml()
except Exception as e:
if skip_errors:
print(f"- skipping {rule.contents.data.name} - {type(e).__name__}")
errors.append(f"- {rule.contents.data.name} - {e}")
continue
raise
saved.append(rule)
saved_exceptions = []
for exception in exceptions:
try:
exception.save_toml()
except Exception as e:
if skip_errors:
print(f"- skipping {exception.rule_name} - {type(e).__name__}")
errors.append(f"- {exception.rule_name} - {e}")
continue
raise
saved_exceptions.append(exception)
saved_action_connectors = []
for action in action_connectors:
try:
action.save_toml()
except Exception as e:
if skip_errors:
print(f"- skipping {action.name} - {type(e).__name__}")
errors.append(f"- {action.name} - {e}")
continue
raise
saved_action_connectors.append(action)
click.echo(f"{len(results)} results exported")
click.echo(f"{len(exported)} rules converted")
click.echo(f"{len(exceptions)} exceptions exported")
click.echo(f"{len(action_connectors)} action connectors exported")
click.echo(f"{len(saved)} rules saved to {directory}")
click.echo(f"{len(saved_exceptions)} exception lists saved to {exceptions_directory}")
click.echo(f"{len(saved_action_connectors)} action connectors saved to {action_connectors_directory}")
if errors:
err_file = directory / "_errors.txt"
err_file.write_text("\n".join(errors))
click.echo(f"{len(errors)} errors saved to {err_file}")
return exported