detection_rules/main.py (535 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""CLI commands for detection_rules."""
import dataclasses
import glob
import json
import os
import time
from datetime import datetime
import pytoml
from marshmallow_dataclass import class_schema
from pathlib import Path
from semver import Version
from typing import Dict, Iterable, List, Optional, get_args
from uuid import uuid4
import click
from .action_connector import (TOMLActionConnectorContents,
build_action_connector_objects, parse_action_connector_results_from_api)
from .attack import build_threat_map_entry
from .cli_utils import rule_prompt, multi_collection
from .config import load_current_package_version, parse_rules_config
from .generic_loader import GenericCollection
from .exception import (TOMLExceptionContents,
build_exception_objects, parse_exceptions_results_from_api)
from .misc import (
add_client, client_error, nested_set, parse_user_config
)
from .rule import TOMLRule, TOMLRuleContents, QueryRuleData
from .rule_formatter import toml_write
from .rule_loader import RuleCollection, update_metadata_from_file
from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file
from .utils import Ndjson, get_path, get_etc_path, clear_caches, load_dump, load_rule_contents, rulename_to_filename
RULES_CONFIG = parse_rules_config()
RULES_DIRS = RULES_CONFIG.rule_dirs
@click.group(
'detection-rules',
context_settings={
'help_option_names': ['-h', '--help'],
'max_content_width': int(os.getenv('DR_CLI_MAX_WIDTH', 240)),
},
)
@click.option('--debug/--no-debug', '-D/-N', is_flag=True, default=None,
help='Print full exception stacktrace on errors')
@click.pass_context
def root(ctx, debug):
"""Commands for detection-rules repository."""
debug = debug if debug is not None else parse_user_config().get('debug')
ctx.obj = {'debug': debug, 'rules_config': RULES_CONFIG}
if debug:
click.secho('DEBUG MODE ENABLED', fg='yellow')
@root.command('create-rule')
@click.argument('path', type=Path)
@click.option('--config', '-c', type=click.Path(exists=True, dir_okay=False, path_type=Path),
help='Rule or config file')
@click.option('--required-only', is_flag=True, help='Only prompt for required fields')
@click.option('--rule-type', '-t', type=click.Choice(sorted(TOMLRuleContents.all_rule_types())),
help='Type of rule to create')
def create_rule(path, config, required_only, rule_type):
"""Create a detection rule."""
contents = load_rule_contents(config, single_only=True)[0] if config else {}
return rule_prompt(path, rule_type=rule_type, required_only=required_only, save=True, **contents)
@root.command('generate-rules-index')
@click.option('--query', '-q', help='Optional KQL query to limit to specific rules')
@click.option('--overwrite', is_flag=True, help='Overwrite files in an existing folder')
@click.pass_context
def generate_rules_index(ctx: click.Context, query, overwrite, save_files=True):
"""Generate enriched indexes of rules, based on a KQL search, for indexing/importing into elasticsearch/kibana."""
from .packaging import Package
if query:
rule_paths = [r['file'] for r in ctx.invoke(search_rules, query=query, verbose=False)]
rules = RuleCollection()
rules.load_files(Path(p) for p in rule_paths)
else:
rules = RuleCollection.default()
rule_count = len(rules)
package = Package(rules, name=load_current_package_version(), verbose=False)
package_hash = package.get_package_hash()
bulk_upload_docs, importable_rules_docs = package.create_bulk_index_body()
if save_files:
path = get_path('enriched-rule-indexes', package_hash)
path.mkdir(parents=True, exist_ok=overwrite)
bulk_upload_docs.dump(path.joinpath('enriched-rules-index-uploadable.ndjson'), sort_keys=True)
importable_rules_docs.dump(path.joinpath('enriched-rules-index-importable.ndjson'), sort_keys=True)
click.echo(f'files saved to: {path}')
click.echo(f'{rule_count} rules included')
return bulk_upload_docs, importable_rules_docs
@root.command("import-rules-to-repo")
@click.argument("input-file", type=click.Path(dir_okay=False, exists=True), nargs=-1, required=False)
@click.option("--action-connector-import", "-ac", is_flag=True, help="Include action connectors in export")
@click.option("--exceptions-import", "-e", is_flag=True, help="Include exceptions in export")
@click.option("--required-only", is_flag=True, help="Only prompt for required fields")
@click.option("--directory", "-d", type=click.Path(file_okay=False, exists=True), help="Load files from a directory")
@click.option(
"--save-directory", "-s", type=click.Path(file_okay=False, exists=True), help="Save imported rules to a directory"
)
@click.option(
"--exceptions-directory",
"-se",
type=click.Path(file_okay=False, exists=True),
help="Save imported exceptions to a directory",
)
@click.option(
"--action-connectors-directory",
"-sa",
type=click.Path(file_okay=False, exists=True),
help="Save imported actions to a directory",
)
@click.option("--skip-errors", "-ske", is_flag=True, help="Skip rule import errors")
@click.option("--default-author", "-da", type=str, required=False, help="Default author for rules missing one")
@click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule")
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
def import_rules_into_repo(input_file: click.Path, required_only: bool, action_connector_import: bool,
exceptions_import: bool, directory: click.Path, save_directory: click.Path,
action_connectors_directory: click.Path, exceptions_directory: click.Path,
skip_errors: bool, default_author: str, strip_none_values: bool, local_creation_date: bool,
local_updated_date: bool):
"""Import rules from json, toml, or yaml files containing Kibana exported rule(s)."""
errors = []
rule_files = glob.glob(os.path.join(directory, "**", "*.*"), recursive=True) if directory else []
rule_files = sorted(set(rule_files + list(input_file)))
file_contents = []
for rule_file in rule_files:
file_contents.extend(load_rule_contents(Path(rule_file)))
if not file_contents:
click.echo("Must specify at least one file!")
exceptions_containers = {}
exceptions_items = {}
exceptions_containers, exceptions_items, _, unparsed_results = parse_exceptions_results_from_api(file_contents)
action_connectors, unparsed_results = parse_action_connector_results_from_api(unparsed_results)
file_contents = unparsed_results
exception_list_rule_table = {}
action_connector_rule_table = {}
rule_count = 0
for contents in file_contents:
# Don't load exceptions as rules
if contents.get("type") not in get_args(definitions.RuleType):
click.echo(f"Skipping - {contents.get("type")} is not a supported rule type")
continue
base_path = contents.get("name") or contents.get("rule", {}).get("name")
base_path = rulename_to_filename(base_path) if base_path else base_path
if base_path is None:
raise ValueError(f"Invalid rule file, please ensure the rule has a name field: {contents}")
rule_path = os.path.join(save_directory if save_directory is not None else RULES_DIRS[0], base_path)
# handle both rule json formats loaded from kibana and toml
data_view_id = contents.get("data_view_id") or contents.get("rule", {}).get("data_view_id")
additional = ["index"] if not data_view_id else ["data_view_id"]
# Use additional to store all available fields for the rule
additional += [key for key in contents if key not in additional and contents.get(key, None)]
# use default author if not provided
contents["author"] = contents.get("author") or default_author or [contents.get("created_by")]
if isinstance(contents["author"], str):
contents["author"] = [contents["author"]]
contents.update(
update_metadata_from_file(
Path(rule_path), {"creation_date": local_creation_date, "updated_date": local_updated_date}
)
)
output = rule_prompt(
rule_path,
required_only=required_only,
save=True,
verbose=True,
additional_required=additional,
skip_errors=skip_errors,
strip_none_values=strip_none_values,
**contents,
)
# If output is not a TOMLRule
if isinstance(output, str):
errors.append(output)
else:
rule_count += 1
if contents.get("exceptions_list"):
# For each item in rule.contents.data.exceptions_list to the exception_list_rule_table under the list_id
for exception in contents["exceptions_list"]:
exception_id = exception["list_id"]
if exception_id not in exception_list_rule_table:
exception_list_rule_table[exception_id] = []
exception_list_rule_table[exception_id].append({"id": contents["id"], "name": contents["name"]})
if contents.get("actions"):
# If rule has actions with connectors, add them to the action_connector_rule_table under the action_id
for action in contents["actions"]:
action_id = action["id"]
if action_id not in action_connector_rule_table:
action_connector_rule_table[action_id] = []
action_connector_rule_table[action_id].append({"id": contents["id"], "name": contents["name"]})
# Build TOMLException Objects
if exceptions_import:
_, e_output, e_errors = build_exception_objects(
exceptions_containers,
exceptions_items,
exception_list_rule_table,
exceptions_directory,
save_toml=True,
skip_errors=skip_errors,
verbose=True,
)
for line in e_output:
click.echo(line)
errors.extend(e_errors)
# Build TOMLActionConnector Objects
if action_connector_import:
_, ac_output, ac_errors = build_action_connector_objects(
action_connectors,
action_connector_rule_table,
action_connectors_directory,
save_toml=True,
skip_errors=skip_errors,
verbose=True,
)
for line in ac_output:
click.echo(line)
errors.extend(ac_errors)
exceptions_count = 0 if not exceptions_import else len(exceptions_containers) + len(exceptions_items)
click.echo(f"{rule_count + exceptions_count + len(action_connectors)} results exported")
click.echo(f"{rule_count} rules converted")
click.echo(f"{exceptions_count} exceptions exported")
click.echo(f"{len(action_connectors)} actions connectors exported")
if errors:
err_file = save_directory if save_directory is not None else RULES_DIRS[0] / "_errors.txt"
err_file.write_text("\n".join(errors))
click.echo(f"{len(errors)} errors saved to {err_file}")
@root.command('build-limited-rules')
@click.option('--stack-version', type=click.Choice(all_versions()), required=True,
help='Version to downgrade to be compatible with the older instance of Kibana')
@click.option('--output-file', '-o', type=click.Path(dir_okay=False, exists=False), required=True)
def build_limited_rules(stack_version: str, output_file: str):
"""
Import rules from json, toml, or Kibana exported rule file(s),
filter out unsupported ones, and write to output NDJSON file.
"""
# Schema generation and incompatible fields detection
query_rule_data = class_schema(QueryRuleData)()
fields = getattr(query_rule_data, 'fields', {})
incompatible_fields = get_incompatible_fields(list(fields.values()),
Version.parse(stack_version, optional_minor_and_patch=True))
# Load all rules
rules = RuleCollection.default()
# Define output path
output_path = Path(output_file)
# Define ndjson instance for output
ndjson_output = Ndjson()
# Get API schema for rule type
api_schema = get_schema_file(stack_version, "base")["properties"]["type"]["enum"]
# Function to process each rule
def process_rule(rule, incompatible_fields: List[str]):
if rule.contents.type not in api_schema:
click.secho(f'{rule.contents.name} - Skipping unsupported rule type: {rule.contents.get("type")}',
fg='yellow')
return None
# Remove unsupported fields from rule
rule_contents = rule.contents.to_api_format()
for field in incompatible_fields:
rule_contents.pop(field, None)
return rule_contents
# Process each rule and add to ndjson_output
for rule in rules.rules:
processed_rule = process_rule(rule, incompatible_fields)
if processed_rule is not None:
ndjson_output.append(processed_rule)
# Write ndjson_output to file
ndjson_output.dump(output_path)
click.echo(f'Success: Rules written to {output_file}')
@root.command('toml-lint')
@click.option('--rule-file', '-f', multiple=True, type=click.Path(exists=True),
help='Specify one or more rule files.')
def toml_lint(rule_file):
"""Cleanup files with some simple toml formatting."""
if rule_file:
rules = RuleCollection()
rules.load_files(Path(p) for p in rule_file)
else:
rules = RuleCollection.default()
# re-save the rules to force TOML reformatting
for rule in rules:
rule.save_toml()
click.echo('TOML file linting complete')
@root.command('mass-update')
@click.argument('query')
@click.option('--metadata', '-m', is_flag=True, help='Make an update to the rule metadata rather than contents.')
@click.option('--language', type=click.Choice(["eql", "kql"]), default="kql")
@click.option('--field', type=(str, str), multiple=True,
help='Use rule-search to retrieve a subset of rules and modify values '
'(ex: --field management.ecs_version 1.1.1).\n'
'Note this is limited to string fields only. Nested fields should use dot notation.')
@click.pass_context
def mass_update(ctx, query, metadata, language, field):
"""Update multiple rules based on eql results."""
rules = RuleCollection().default()
results = ctx.invoke(search_rules, query=query, language=language, verbose=False)
matching_ids = set(r["rule_id"] for r in results)
rules = rules.filter(lambda r: r.id in matching_ids)
for rule in rules:
for key, value in field:
nested_set(rule.metadata if metadata else rule.contents, key, value)
rule.validate(as_rule=True)
rule.save(as_rule=True)
return ctx.invoke(search_rules, query=query, language=language,
columns=['rule_id', 'name'] + [k[0].split('.')[-1] for k in field])
@root.command('view-rule')
@click.argument('rule-file', type=Path)
@click.option('--api-format/--rule-format', default=True, help='Print the rule in final api or rule format')
@click.pass_context
def view_rule(ctx, rule_file, api_format):
"""View an internal rule or specified rule file."""
rule = RuleCollection().load_file(rule_file)
if api_format:
click.echo(json.dumps(rule.contents.to_api_format(), indent=2, sort_keys=True))
else:
click.echo(toml_write(rule.contents.to_dict()))
return rule
def _export_rules(
rules: RuleCollection,
outfile: Path,
downgrade_version: Optional[definitions.SemVer] = None,
verbose=True,
skip_unsupported=False,
include_metadata: bool = False,
include_action_connectors: bool = False,
include_exceptions: bool = False,
):
"""Export rules and exceptions into a consolidated ndjson file."""
from .rule import downgrade_contents_from_rule
outfile = outfile.with_suffix('.ndjson')
unsupported = []
if downgrade_version:
if skip_unsupported:
output_lines = []
for rule in rules:
try:
output_lines.append(json.dumps(downgrade_contents_from_rule(rule, downgrade_version,
include_metadata=include_metadata),
sort_keys=True))
except ValueError as e:
unsupported.append(f'{e}: {rule.id} - {rule.name}')
continue
else:
output_lines = [json.dumps(downgrade_contents_from_rule(r, downgrade_version,
include_metadata=include_metadata), sort_keys=True)
for r in rules]
else:
output_lines = [json.dumps(r.contents.to_api_format(include_metadata=include_metadata),
sort_keys=True) for r in rules]
# Add exceptions to api format here and add to output_lines
if include_exceptions or include_action_connectors:
cl = GenericCollection.default()
# Get exceptions in API format
if include_exceptions:
exceptions = [d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLExceptionContents)]
exceptions = [e for sublist in exceptions for e in sublist]
output_lines.extend(json.dumps(e, sort_keys=True) for e in exceptions)
if include_action_connectors:
action_connectors = [
d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLActionConnectorContents)
]
actions = [a for sublist in action_connectors for a in sublist]
output_lines.extend(json.dumps(a, sort_keys=True) for a in actions)
outfile.write_text('\n'.join(output_lines) + '\n')
if verbose:
click.echo(f'Exported {len(rules) - len(unsupported)} rules into {outfile}')
if skip_unsupported and unsupported:
unsupported_str = '\n- '.join(unsupported)
click.echo(f'Skipped {len(unsupported)} unsupported rules: \n- {unsupported_str}')
@root.command("export-rules-from-repo")
@multi_collection
@click.option(
"--outfile",
"-o",
default=Path(get_path("exports", f'{time.strftime("%Y%m%dT%H%M%SL")}.ndjson')),
type=Path,
help="Name of file for exported rules",
)
@click.option("--replace-id", "-r", is_flag=True, help="Replace rule IDs with new IDs before export")
@click.option(
"--stack-version",
type=click.Choice(all_versions()),
help="Downgrade a rule version to be compatible with older instances of Kibana",
)
@click.option(
"--skip-unsupported",
"-s",
is_flag=True,
help="If `--stack-version` is passed, skip rule types which are unsupported " "(an error will be raised otherwise)",
)
@click.option("--include-metadata", type=bool, is_flag=True, default=False, help="Add metadata to the exported rules")
@click.option(
"--include-action-connectors",
"-ac",
type=bool,
is_flag=True,
default=False,
help="Include Action Connectors in export",
)
@click.option(
"--include-exceptions", "-e", type=bool, is_flag=True, default=False, help="Include Exceptions Lists in export"
)
def export_rules_from_repo(rules, outfile: Path, replace_id, stack_version, skip_unsupported, include_metadata: bool,
include_action_connectors: bool, include_exceptions: bool) -> RuleCollection:
"""Export rule(s) and exception(s) into an importable ndjson file."""
assert len(rules) > 0, "No rules found"
if replace_id:
# if we need to replace the id, take each rule object and create a copy
# of it, with only the rule_id field changed
old_rules = rules
rules = RuleCollection()
for rule in old_rules:
new_data = dataclasses.replace(rule.contents.data, rule_id=str(uuid4()))
new_contents = dataclasses.replace(rule.contents, data=new_data)
rules.add_rule(TOMLRule(contents=new_contents))
outfile.parent.mkdir(exist_ok=True)
_export_rules(
rules=rules,
outfile=outfile,
downgrade_version=stack_version,
skip_unsupported=skip_unsupported,
include_metadata=include_metadata,
include_action_connectors=include_action_connectors,
include_exceptions=include_exceptions,
)
return rules
@root.command('validate-rule')
@click.argument('path')
@click.pass_context
def validate_rule(ctx, path):
"""Check if a rule staged in rules dir validates against a schema."""
rule = RuleCollection().load_file(Path(path))
click.echo('Rule validation successful')
return rule
@root.command('validate-all')
def validate_all():
"""Check if all rules validates against a schema."""
RuleCollection.default()
click.echo('Rule validation successful')
@root.command('rule-search')
@click.argument('query', required=False)
@click.option('--columns', '-c', multiple=True, help='Specify columns to add the table')
@click.option('--language', type=click.Choice(["eql", "kql"]), default="kql")
@click.option('--count', is_flag=True, help='Return a count rather than table')
def search_rules(query, columns, language, count, verbose=True, rules: Dict[str, TOMLRule] = None, pager=False):
"""Use KQL or EQL to find matching rules."""
from kql import get_evaluator
from eql.table import Table
from eql.build import get_engine
from eql import parse_query
from eql.pipes import CountPipe
from .rule import get_unique_query_fields
flattened_rules = []
rules = rules or {str(rule.path): rule for rule in RuleCollection.default()}
for file_name, rule in rules.items():
flat: dict = {"file": os.path.relpath(file_name)}
flat.update(rule.contents.to_dict())
flat.update(flat["metadata"])
flat.update(flat["rule"])
tactic_names = []
technique_ids = []
subtechnique_ids = []
for entry in flat['rule'].get('threat', []):
if entry["framework"] != "MITRE ATT&CK":
continue
techniques = entry.get('technique', [])
tactic_names.append(entry['tactic']['name'])
technique_ids.extend([t['id'] for t in techniques])
subtechnique_ids.extend([st['id'] for t in techniques for st in t.get('subtechnique', [])])
flat.update(techniques=technique_ids, tactics=tactic_names, subtechniques=subtechnique_ids,
unique_fields=get_unique_query_fields(rule))
flattened_rules.append(flat)
flattened_rules.sort(key=lambda dct: dct["name"])
filtered = []
if language == "kql":
evaluator = get_evaluator(query) if query else lambda x: True
filtered = list(filter(evaluator, flattened_rules))
elif language == "eql":
parsed = parse_query(query, implied_any=True, implied_base=True)
evaluator = get_engine(parsed)
filtered = [result.events[0].data for result in evaluator(flattened_rules)]
if not columns and any(isinstance(pipe, CountPipe) for pipe in parsed.pipes):
columns = ["key", "count", "percent"]
if count:
click.echo(f'{len(filtered)} rules')
return filtered
if columns:
columns = ",".join(columns).split(",")
else:
columns = ["rule_id", "file", "name"]
table = Table.from_list(columns, filtered)
if verbose:
click.echo_via_pager(table) if pager else click.echo(table)
return filtered
@root.command('build-threat-map-entry')
@click.argument('tactic')
@click.argument('technique-ids', nargs=-1)
def build_threat_map(tactic: str, technique_ids: Iterable[str]):
"""Build a threat map entry."""
entry = build_threat_map_entry(tactic, *technique_ids)
rendered = pytoml.dumps({'rule': {'threat': [entry]}})
# strip out [rule]
cleaned = '\n'.join(rendered.splitlines()[2:])
print(cleaned)
return entry
@root.command("test")
@click.pass_context
def test_rules(ctx):
"""Run unit tests over all of the rules."""
import pytest
rules_config = ctx.obj['rules_config']
test_config = rules_config.test_config
tests, skipped = test_config.get_test_names(formatted=True)
if skipped:
click.echo(f'Tests skipped per config ({len(skipped)}):')
click.echo('\n'.join(skipped))
clear_caches()
if tests:
ctx.exit(pytest.main(['-v'] + tests))
else:
click.echo('No tests found to execute!')
@root.group('typosquat')
def typosquat_group():
"""Commands for generating typosquat detections."""
@typosquat_group.command('create-dnstwist-index')
@click.argument('input-file', type=click.Path(exists=True, dir_okay=False), required=True)
@click.pass_context
@add_client('elasticsearch', add_func_arg=False)
def create_dnstwist_index(ctx: click.Context, input_file: click.Path):
"""Create a dnstwist index in Elasticsearch to work with a threat match rule."""
from elasticsearch import Elasticsearch
es_client: Elasticsearch = ctx.obj['es']
click.echo(f'Attempting to load dnstwist data from {input_file}')
dnstwist_data: dict = load_dump(str(input_file))
click.echo(f'{len(dnstwist_data)} records loaded')
original_domain = next(r['domain-name'] for r in dnstwist_data if r.get('fuzzer', '') == 'original*')
click.echo(f'Original domain name identified: {original_domain}')
domain = original_domain.split('.')[0]
domain_index = f'dnstwist-{domain}'
# If index already exists, prompt user to confirm if they want to overwrite
if es_client.indices.exists(index=domain_index):
if click.confirm(
f"dnstwist index: {domain_index} already exists for {original_domain}. Do you want to overwrite?",
abort=True):
es_client.indices.delete(index=domain_index)
fields = [
"dns-a",
"dns-aaaa",
"dns-mx",
"dns-ns",
"banner-http",
"fuzzer",
"original-domain",
"dns.question.registered_domain"
]
timestamp_field = "@timestamp"
mappings = {"mappings": {"properties": {f: {"type": "keyword"} for f in fields}}}
mappings["mappings"]["properties"][timestamp_field] = {"type": "date"}
es_client.indices.create(index=domain_index, body=mappings)
# handle dns.question.registered_domain separately
fields.pop()
es_updates = []
now = datetime.utcnow()
for item in dnstwist_data:
if item['fuzzer'] == 'original*':
continue
record = item.copy()
record.setdefault('dns', {}).setdefault('question', {}).setdefault('registered_domain', item.get('domain-name'))
for field in fields:
record.setdefault(field, None)
record['@timestamp'] = now
es_updates.extend([{'create': {'_index': domain_index}}, record])
click.echo(f'Indexing data for domain {original_domain}')
results = es_client.bulk(body=es_updates)
if results['errors']:
error = {r['create']['result'] for r in results['items'] if r['create']['status'] != 201}
client_error(f'Errors occurred during indexing:\n{error}')
click.echo(f'{len(results["items"])} watchlist domains added to index')
click.echo('Run `prep-rule` and import to Kibana to create alerts on this index')
@typosquat_group.command('prep-rule')
@click.argument('author')
def prep_rule(author: str):
"""Prep the detection threat match rule for dnstwist data with a rule_id and author."""
rule_template_file = get_etc_path('rule_template_typosquatting_domain.json')
template_rule = json.loads(rule_template_file.read_text())
template_rule.update(author=[author], rule_id=str(uuid4()))
updated_rule = get_path('rule_typosquatting_domain.ndjson')
updated_rule.write_text(json.dumps(template_rule, sort_keys=True))
click.echo(f'Rule saved to: {updated_rule}. Import this to Kibana to create alerts on all dnstwist-* indexes')
click.echo('Note: you only need to import and enable this rule one time for all dnstwist-* indexes')