prebuilt-rules-scripts/generate.py (475 lines of code) (raw):
"""Manage documentation generation for pre-built rules."""
import re
import shutil
import textwrap
from pathlib import Path
from typing import List
import click
import json
import yaml
ROOT = Path(__file__).resolve().parent.parent
PREBUILT_RULES = ROOT.joinpath('prebuilt-rules-scripts')
GENERATED_ASCII = ROOT.joinpath('generated-ascii-files')
DEFAULT_KIBANA_RULES_DIR = str(Path().joinpath('x-pack', 'plugins', 'security_solution', 'server', 'lib',
'detection_engine', 'prebuilt_rules', 'content', 'prepackaged_rules'))
DEFAULT_LOCAL_KIBANA = ROOT.joinpath('..', 'kibana')
class Version(tuple):
def __new__(cls, version):
if not isinstance(version, (int, list, tuple)):
version = tuple(int(a) if a.isdigit() else a for a in re.split(r'[.-]', version))
return version if isinstance(version, int) else tuple.__new__(cls, version)
def bump(self):
"""Increment the version."""
versions = list(self)
versions[-1] += 1
return Version(versions)
def __str__(self):
"""Convert back to a string."""
return ".".join(str(dig) for dig in self)
@classmethod
def sort_as_strings(cls, *version_strings: str, reverse=False) -> List[str]:
versions = sorted([Version(v) for v in version_strings], reverse=reverse)
return [str(v) for v in versions]
@click.group('root', context_settings={'help_option_names': ['-h', '--help']})
def root():
"""Commands for generating rule documentation."""
def _get_release_versions() -> List[Version]:
final_files = PREBUILT_RULES.joinpath('diff-files', 'final-files').glob('*.json')
versions = [str(f).rsplit('-', 1)[1].rsplit('.', 1)[0] for f in final_files]
versions = [Version(v) for v in versions]
return versions
def _get_last_release_version() -> str:
"""Get latest release version based on final-files versioning."""
return str(max(_get_release_versions()))
def _sort_by_name(rule):
return rule['name']
def _sort_tag_by_name(rule):
return rule['tag']
def _translate_interval_period(interval):
units = ""
length = ""
runtime = re.match(r"([0-9]+)([a-z]+)", interval, re.I)
if runtime:
runtime = runtime.groups()
if len(runtime) == 2:
if runtime[1] == 'm':
units = "minutes"
elif runtime[1] == 's':
units = "seconds"
elif runtime[1] == 'h':
units = "hours"
elif runtime[1] == 'H':
units = "hours"
elif runtime[1] == 'd':
units = "days"
elif runtime[1] == 'w':
units = "weeks"
elif runtime[1] == 'M':
units = "months"
elif runtime[1] == 'y':
units = "years"
else:
units = ""
length = runtime[0]
if length == "1":
units = units[:-1]
return str(length + " " + units)
def _left_align(text):
return '\n'.join([t.lstrip() for t in text.splitlines()])
def _convert_name_to_filename(name: str) -> str:
name = re.sub(r'[():]', '', name.lower())
name = re.sub(r'[ -+/\\]+', '-', name)
name = re.sub(r'-+', '-', name)
return name
def get_release_rules(package_version, local_kibana, rules_dir):
rule_source_folder = Path(local_kibana).resolve().joinpath(rules_dir)
assert rule_source_folder.exists(), f'Rules folder does not exist in {local_kibana}'
rule_dict = []
rule_files = rule_source_folder.glob('*.json')
for rule_file in rule_files:
with open(rule_file, 'r') as f:
rule_json = json.load(f)
rule_dict.append(rule_json)
rule_dict = sorted(rule_dict, key=_sort_by_name)
rule_dump = str(PREBUILT_RULES.joinpath('orig-rules-json-files', f'{package_version}-prebuilt-rule.json'))
with open(rule_dump, "w") as f:
json.dump(rule_dict, f, indent=2)
click.echo(f'saved file: {rule_dump}')
def create_json_from_docs(package_version):
"""Create a json file of the exiting rule docs."""
existing_rule_asciidocs = ROOT.joinpath('docs', 'detections', 'prebuilt-rules', 'rule-details')
rule_asciidoc_files = existing_rule_asciidocs.glob("*.asciidoc")
rule_dict = []
name = ""
description = ""
false_pos = ""
notes = ""
is_desc = False
is_false_pos = False
is_notes = False
for rule_asciidoc_file in rule_asciidoc_files:
with open(rule_asciidoc_file, 'r') as f:
text = f.readlines()
for count, line in enumerate(text):
if count == 1:
name = line.split("=== ")[1].replace("\n", "")
is_desc = True
if is_desc:
description = description + line
if "*Rule type*" in text[count + 1]:
is_desc = False
if "==== Potential false positives" in line:
is_false_pos = True
continue
if is_false_pos:
false_pos = false_pos + line
if ("==== Rule query" in text[count]) or ("==== Investigation guide" in text[count]) or (
"==== Rule version history" in text[count]):
is_false_pos = False
if "==== Investigation guide" in line:
is_notes = True
continue
if is_notes:
notes = notes + line
is_notes = False
is_false_pos = False
description = description.split("\n\n", 1)[1].replace("\n\n", "\n").rstrip()
if "[[" in notes:
notes = notes.split("[[")[0]
if "\n==== Rule query\n" in notes:
notes = notes.split("\n==== Rule query\n")[0]
if "==== Rule query\n" in false_pos:
false_pos = false_pos.split("\n\n==== Rule query\n")[0]
if "==== Investigation guide\n" in false_pos:
false_pos = false_pos.split("\n\n==== Investigation guide\n")[0]
if "[[" in false_pos:
false_pos = false_pos.split("[[")[0]
rule_text = {"name": name, "description": description}
if false_pos:
rule_text['false_positives'] = [false_pos.lstrip()]
if notes:
notes_text = notes.rstrip()
notes_text = notes_text[1:] if notes_text.startswith('\n') else notes_text
rule_text['note'] = notes_text
rule_dict.append(rule_text)
name = ""
description = ""
false_pos = ""
notes = ""
rule_dict = sorted(rule_dict, key=_sort_by_name)
diff_file = str(PREBUILT_RULES.joinpath('diff-files', 'gen-files', f'json-from-docs-{package_version}.json'))
with open(diff_file, "w") as f:
json.dump(rule_dict, f, indent=2)
click.echo(f'saved file: {diff_file}')
def update_current_text(package_version):
# Path to latest prebuilt rules JSON file
rule_dump = str(PREBUILT_RULES.joinpath('orig-rules-json-files', f'{package_version}-prebuilt-rule.json'))
with open(rule_dump, "r") as source:
current_text = json.load(source)
# Path to JSON file generated from existing documentation
diff_file = str(PREBUILT_RULES.joinpath('diff-files', 'gen-files', f'json-from-docs-{package_version}.json'))
with open(diff_file, "r") as source:
updated_text = json.load(source)
for rule in current_text:
for new_text in updated_text:
if rule['name'] == new_text['name']:
new_text['description'] = rule['description']
if 'false_positives' in new_text and 'false_positives' in rule:
new_text['false_positives'][0] = rule['false_positives'][0]
if 'note' in new_text:
new_text['note'] = rule['note']
# Output file with updated text from the documentation for previously existing
# prebuilt rules. New rules are unchanged.
with open(diff_file, "w") as fp:
json.dump(current_text, fp, indent=2)
click.echo(f'saved file: {diff_file}')
def get_rule_diff(package_version):
previous_release = _get_last_release_version()
err_str = f'Most recent detected version: {previous_release} !< {package_version}. '
err_str += f'Remove {package_version} generated files (3) or specify a higher version to build'
assert Version(package_version) > Version(previous_release), err_str
# Path to the JSON rule file generated for this release
diff_file = str(PREBUILT_RULES.joinpath('diff-files', 'gen-files', f'json-from-docs-{package_version}.json'))
with open(diff_file, 'r') as source:
lasted_rules_dict = json.load(source)
# Path to the final JSON rule file generated for the previous release
prev_final = str(PREBUILT_RULES.joinpath('diff-files', 'final-files', f'final-rule-file-{previous_release}.json'))
with open(prev_final, 'r') as source:
previous_rules_dict = json.load(source)
diff_dict = []
lasted_rules_dict = sorted(lasted_rules_dict, key=_sort_by_name)
previous_rules_dict = sorted(previous_rules_dict, key=_sort_by_name)
old_name = None
counter = 0
rule_found = False
for new_rule in lasted_rules_dict:
for old_rule in previous_rules_dict:
if old_rule['rule_id'] == new_rule['rule_id']:
if old_rule['name'] != new_rule['name']:
old_name = old_rule['name']
old_file_name = _convert_name_to_filename(old_rule["name"])
new_file_name = _convert_name_to_filename(new_rule["name"])
file_name_changed = old_file_name != new_file_name
old_rule['name'] = new_rule['name']
if 'changelog' in old_rule:
new_rule['changelog'] = old_rule['changelog']
if old_rule['version'] != new_rule['version']:
new_rule['last_update'] = package_version
if 'changelog' not in new_rule:
new_rule['changelog'] = {}
new_rule['changelog']['changes'] = []
if 'query' in new_rule:
if old_rule['query'] != new_rule['query']:
new_rule['changelog']['changes'].append({
"version": new_rule['version'],
"updated": new_rule['last_update'],
"pre_query": old_rule['query'],
"doc_text": "Updated query.",
"pre_name": old_name
})
elif old_rule['query'] == new_rule['query']:
new_rule['changelog']['changes'].append({
"version": new_rule['version'],
"updated": new_rule['last_update'],
"pre_query": old_rule['query'],
"doc_text": "Formatting only",
"pre_name": old_name
})
if 'query' not in new_rule:
new_rule['changelog']['changes'].append({
"version": new_rule['version'],
"updated": new_rule['last_update'],
"pre_query": "N/A",
"doc_text": "Formatting only",
"pre_name": old_name
})
if file_name_changed:
new_rule['changelog']['changes'][-1]['duplicate_old_file'] = old_file_name
else:
new_rule['last_update'] = old_rule['last_update']
diff_dict.append(new_rule)
new_rule['added'] = old_rule['added']
old_name = None
rule_found = True
if rule_found is False:
new_rule['last_update'] = package_version
new_rule['added'] = package_version
diff_dict.append(new_rule)
counter = counter + 1
rule_found = False
# Outputs the final JSON file from which the documentation is generated. Note
# that this file is needed for the next release to compare future changes.
final = str(PREBUILT_RULES.joinpath('diff-files', 'final-files', f'final-rule-file-{package_version}.json'))
with open(final, "w") as fp:
json.dump(diff_dict, fp, indent=2)
click.echo(f'saved file: {final}')
def create_documentation(package_release):
# Formats text using asciidoc syntax
def format_text(text):
return text.replace('\\n', '\n')
# Path to the generated JSON file
final_diff = str(PREBUILT_RULES.joinpath('diff-files', 'final-files', f'final-rule-file-{package_release}.json'))
with open(final_diff, 'r') as source:
rules_dict = json.load(source)
sorted_rules = sorted(rules_dict, key=_sort_by_name)
new_text = _left_align("""[[prebuilt-rules]]
[role="xpack"]
== Prebuilt rule reference
This section lists all available prebuilt rules.
IMPORTANT: To run {ml} prebuilt rules, you must have the
https://www.elastic.co/subscriptions[appropriate license] or use a
{ess-trial}[Cloud] deployment. All {ml} prebuilt rules are tagged with `ML`,
and their rule type is `machine_learning`.
[width="100%",options="header"]
|==============================================
|Rule |Description |Tags |Added |Version
""")
# Creates overview table
for rule in sorted_rules:
tag_strings = ""
version_text = ""
link_string = _convert_name_to_filename(rule["name"])
new_text = new_text + "|<<" + link_string + ", " + rule['name'] + ">> |" + re.sub(' +', ' ',
rule['description'].replace(
'\n', ' '))
for i in rule['tags']:
tag_strings = tag_strings + "[" + i + "] "
if rule['version'] == 1:
version_text = str(rule['version'])
if rule['version'] > 1 and rule.get('changelog'):
version_text = str(rule['version']) + " <<" + link_string + "-history, Version history>>"
new_text = new_text + " |" + tag_strings + " |" + rule['added'] + " |" + version_text + "\n\n"
tag_strings = ""
new_text = new_text + "|=============================================="
shutil.rmtree(str(GENERATED_ASCII), ignore_errors=True)
GENERATED_ASCII.mkdir(exist_ok=True)
reference_asciidoc = str(GENERATED_ASCII.joinpath('prebuilt-rules-reference.asciidoc'))
with open(reference_asciidoc, "w+") as f:
f.write(new_text)
# End overview table
# Create files for each rule and the index (ToC) file
rule_details_dir = GENERATED_ASCII.joinpath('rule-details')
rule_details_dir.mkdir(exist_ok=True)
file_text = ""
rules_index_file = []
rule_name_changed = False
files_with_updated_rule_name = {}
updated_queries = False
for rule in sorted_rules:
rule_link = _convert_name_to_filename(rule["name"])
file_text = "[[" + rule_link + "]]\n=== " + rule['name'] + "\n\n"
file_text = file_text + format_text(rule['description']) + "\n\n"
file_text = file_text + "*Rule type*: " + rule['type'] + "\n\n"
if 'machine_learning_job_id' in rule:
# can be a list or str
job_id = rule['machine_learning_job_id']
jod_id_str = ', '.join(job_id) if isinstance(job_id, list) else job_id
file_text = file_text + "*Machine learning job*: " + jod_id_str + "\n\n"
file_text = file_text + "*Machine learning anomaly threshold*: " + str(rule['anomaly_threshold']) + "\n\n"
if 'index' in rule:
if len(rule['index']) != 0:
file_text = file_text + "*Rule indices*:" + "\n\n"
for i in rule['index']:
file_text = file_text + "* " + i + "\n"
else:
file_text = file_text + "*Rule index*: " + rule['index'] + "\n\n"
file_text = file_text + "\n*Severity*: " + rule['severity'] + "\n\n"
file_text = file_text + "*Risk score*: " + str(rule['risk_score']) + "\n\n"
if 'interval' in rule:
file_text = file_text + "*Runs every*: " + _translate_interval_period(rule['interval']) + "\n\n"
if 'interval' not in rule:
file_text = file_text + "*Runs every*: 5 minutes" + "\n\n"
if 'from' in rule:
file_text = file_text + "*Searches indices from*: " + rule[
'from'] + " ({ref}/common-options.html#date-math[Date Math format], see also <<rule-schedule, `Additional look-back time`>>)" + "\n\n"
if 'from' not in rule:
file_text = file_text + "*Searches indices from*: now-6m" + " ({ref}/common-options.html#date-math[Date Math format], see also <<rule-schedule, `Additional look-back time`>>)" + "\n\n"
if 'max_signals' in rule:
file_text = file_text + "*Maximum alerts per execution*: " + str(rule['max_signals']) + "\n\n"
if 'max_signals' not in rule:
file_text = file_text + "*Maximum alerts per execution*: 100" + "\n\n"
if 'references' in rule:
if len(rule['references']) != 0:
file_text = file_text + "*References*:\n\n"
for i in rule['references']:
file_text = file_text + "* " + i + "\n"
if len(rule['references']) != 0:
file_text = file_text + "\n"
file_text = file_text + "*Tags*:\n\n"
for i in rule['tags']:
file_text = file_text + "* " + i + "\n"
if rule['version'] == 1:
file_text = file_text + "\n*Version*: " + str(rule['version']) + "\n\n"
# DEBUG
# if rule['version'] > 1 and not rule.get('changelog'):
# print(rule_link)
if rule['version'] > 1 and rule.get('changelog'):
file_text = file_text + "\n*Version*: " + str(
rule['version']) + " (<<" + rule_link + "-history, version history>>)" + "\n\n"
file_text = file_text + "*Added ({stack} release)*: " + rule['added'] + "\n\n"
if rule['version'] > 1:
file_text = file_text + "*Last modified ({stack} release)*: " + rule['last_update'] + "\n\n"
file_text = file_text + "*Rule authors*: "
for count, i in enumerate(rule['author']):
if count > 0:
file_text = file_text + ", "
file_text = file_text + i
file_text = file_text + "\n\n"
file_text = file_text + "*Rule license*: " + rule['license'] + "\n"
if 'false_positives' in rule:
if len(rule['false_positives']) != 0:
file_text = file_text + "\n==== Potential false positives" + "\n\n"
for i in rule['false_positives']:
file_text = file_text + format_text(i) + "\n"
if 'note' in rule:
file_text = file_text + "\n==== Investigation guide" + "\n\n"
file_text = file_text + "\n[source,markdown]\n"
file_text = file_text + "----------------------------------" + "\n"
file_text = file_text + rule['note'] + "\n"
file_text = file_text + "----------------------------------" + "\n\n"
if 'query' in rule:
file_text = file_text + "\n==== Rule query\n\n"
file_text = file_text + "\n[source,js]\n"
file_text = file_text + "----------------------------------" + "\n"
file_text = file_text + re.sub(' +', ' ', textwrap.fill(rule['query'], width=70)) + "\n"
file_text = file_text + "----------------------------------" + "\n\n"
if 'filters' in rule:
if len(rule['filters']) != 0:
file_text = file_text + "==== Rule filters" + "\n\n"
file_text = file_text + "[source,js]\n"
file_text = file_text + "----------------------------------" + "\n"
for i in rule['filters']:
file_text = file_text + json.dumps(i, sort_keys=True, indent=4) + "\n"
file_text = file_text + "----------------------------------" + "\n\n"
if 'threat' in rule:
if len(rule['threat']) != 0:
file_text = file_text + "==== Threat mapping" + "\n\n"
is_first_loop = True
for i in rule['threat']:
if is_first_loop:
file_text = file_text + "*Framework*: " + i['framework']
is_first_loop = False
if i['framework'] == "MITRE ATT&CK":
file_text = file_text + "^TM^"
file_text = file_text + "\n\n* Tactic:\n"
file_text = file_text + "** Name: " + i['tactic']['name'] + "\n"
file_text = file_text + "** ID: " + i['tactic']['id'] + "\n"
file_text = file_text + "** Reference URL: " + i['tactic']['reference'] + "\n"
if i.get('technique'):
file_text = file_text + "* Technique:\n"
file_text = file_text + "** Name: " + i['technique'][0]['name'] + "\n"
file_text = file_text + "** ID: " + i['technique'][0]['id'] + "\n"
file_text = file_text + "** Reference URL: " + i['technique'][0]['reference'] + "\n"
if 'changelog' in rule:
identifier = rule_link + "-history"
file_text = file_text + "\n[[" + identifier + "]]\n"
file_text = file_text + "==== Rule version history" + "\n\n"
for i in reversed(rule['changelog']['changes']):
file_text = file_text + "Version " + str(i['version']) + " (" + i['updated'] + " release)" + "::\n"
if 'pre_name' in i:
if i['pre_name'] != None:
file_text = file_text + "* Rule name changed from: " + i['pre_name'] + "\n"
rule_name_changed = True
if i['updated'] == package_release:
rule_link_file = rule_link + ".asciidoc"
files_with_updated_rule_name[rule_link_file] = i.get('duplicate_old_file', i['pre_name'])
if i['doc_text'] == "Updated query.":
if 'pre_name' in i:
if i['pre_name'] != None:
file_text = file_text + "+\n"
file_text = file_text + "* Updated query, changed from:\n+\n"
file_text = file_text + "[source, js]\n"
file_text = file_text + "----------------------------------" + "\n"
file_text = file_text + re.sub(' +', ' ', textwrap.fill(i['pre_query'], width=70)) + "\n"
file_text = file_text + "----------------------------------" + "\n\n"
updated_queries = True
if i['doc_text'] != "Updated query." and rule_name_changed == False:
file_text = file_text + "* " + i['doc_text'] + "\n\n"
rule_name_changed = False
asciidoc_file = str(rule_details_dir.joinpath(f'{rule_link}.asciidoc'))
with open(asciidoc_file, "w+") as f:
f.write(file_text)
rules_index_file.append("include::rule-details/" + rule_link + ".asciidoc[]")
# Create index file
index_file_text = ""
for index_link in rules_index_file:
index_file_text += index_link + "\n"
index_file_write = str(GENERATED_ASCII.joinpath('rule-desc-index.asciidoc'))
with open(index_file_write, "w+") as index_file_write:
index_file_write.write(index_file_text)
# Print files of rules with changed names to terminal
print('\n')
for new_file, old_file in sorted(files_with_updated_rule_name.items()):
print(f'Name of rule changed in {new_file} - removing old file: {old_file}')
old_path = rule_details_dir.joinpath(f'{old_file}.asciidoc')
if old_path.exists():
old_path.unlink() # unlink(missing_ok=True) only in 3.8+
print("\n")
# END: Create files for each rule
# START: Create rule changelog file. This needs updating each release to add
# rules changed for the new release.
version_history_page = _left_align("""[[prebuilt-rules-changelog]]
== Prebuilt rule changes per release
The following lists prebuilt rule updates per release. Only rules with
significant modifications to their query or scope are listed. For detailed
information about a rule's changes, see the rule's description page.
""")
# Rules that have been deleted so there is no need to add them manually after
# generating the docs
deleted_rules = _left_align("""
These prebuilt rules have been removed:
* Execution via Signed Binary
* Suspicious Process spawning from Script Interpreter
* Suspicious Script Object Execution
These prebuilt rules have been updated:
""")
def add_version_updates(*update_versions):
nonlocal version_history_page
for update_version in update_versions:
version_history_page = version_history_page + "[float]\n"
version_history_page = version_history_page + "=== " + update_version + "\n\n"
if update_version == "7.7.0":
version_history_page = version_history_page + deleted_rules
for r in sorted_rules:
if 'changelog' in r:
for i in (r['changelog']['changes']):
if i['updated'] == update_version and i['doc_text'] != "Formatting only":
link_string = _convert_name_to_filename(r['name'])
version_history_page = version_history_page + "<<" + link_string + ">>\n\n"
# anytime this is built and changes are made to any queries, it will be added as an entry, to be included in future
# doc generation
changelog_entries_file = str(PREBUILT_RULES.joinpath('changelog-entries.yml'))
with open(changelog_entries_file, 'r+') as f:
changelog_entries = yaml.safe_load(f)
if updated_queries and package_release not in changelog_entries:
changelog_entries.append(package_release)
yaml.safe_dump([package_release], f)
click.echo(f'Changes to queries detected, added: {package_release} to changelog-entries.yml')
version_updates = Version.sort_as_strings(*changelog_entries, reverse=True)
add_version_updates(*version_updates)
reference_asciidoc = str(GENERATED_ASCII.joinpath('prebuilt-rules-changelog.asciidoc'))
with open(reference_asciidoc, "w+") as f:
f.write(version_history_page)
@root.command('generate')
@click.argument('package-version')
@click.option('--rules-dir', '-d', default=DEFAULT_KIBANA_RULES_DIR, help='Path of rules in Kibana repo')
@click.option('--local-kibana', '-l', type=click.Path(exists=True, file_okay=False), default=str(DEFAULT_LOCAL_KIBANA),
help='Location of local kibana repo')
def generate(package_version, rules_dir, local_kibana):
"""Generate pre-built rule documentation."""
get_release_rules(package_version, local_kibana, rules_dir)
create_json_from_docs(package_version)
update_current_text(package_version)
get_rule_diff(package_version)
create_documentation(package_version)
click.echo('Files staged to generated-ascii-files folder - move these over to docs/detections/prebuilt-rules')
if __name__ == '__main__':
root(prog_name='generate')