detection_rules/main.py (527 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License # 2.0; you may not use this file except in compliance with the Elastic License # 2.0. """CLI commands for detection_rules.""" import dataclasses import glob import json import os import time from datetime import datetime import pytoml from marshmallow_dataclass import class_schema from pathlib import Path from semver import Version from typing import Dict, Iterable, List, Optional, get_args from uuid import uuid4 import click from .action_connector import (TOMLActionConnectorContents, build_action_connector_objects, parse_action_connector_results_from_api) from .attack import build_threat_map_entry from .cli_utils import rule_prompt, multi_collection from .config import load_current_package_version, parse_rules_config from .generic_loader import GenericCollection from .exception import (TOMLExceptionContents, build_exception_objects, parse_exceptions_results_from_api) from .misc import ( add_client, client_error, nested_set, parse_user_config ) from .rule import TOMLRule, TOMLRuleContents, QueryRuleData from .rule_formatter import toml_write from .rule_loader import RuleCollection from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file from .utils import Ndjson, get_path, get_etc_path, clear_caches, load_dump, load_rule_contents, rulename_to_filename RULES_CONFIG = parse_rules_config() RULES_DIRS = RULES_CONFIG.rule_dirs @click.group( 'detection-rules', context_settings={ 'help_option_names': ['-h', '--help'], 'max_content_width': int(os.getenv('DR_CLI_MAX_WIDTH', 240)), }, ) @click.option('--debug/--no-debug', '-D/-N', is_flag=True, default=None, help='Print full exception stacktrace on errors') @click.pass_context def root(ctx, debug): """Commands for detection-rules repository.""" debug = debug if debug is not None else parse_user_config().get('debug') ctx.obj = {'debug': debug, 'rules_config': RULES_CONFIG} if debug: click.secho('DEBUG MODE ENABLED', fg='yellow') @root.command('create-rule') @click.argument('path', type=Path) @click.option('--config', '-c', type=click.Path(exists=True, dir_okay=False, path_type=Path), help='Rule or config file') @click.option('--required-only', is_flag=True, help='Only prompt for required fields') @click.option('--rule-type', '-t', type=click.Choice(sorted(TOMLRuleContents.all_rule_types())), help='Type of rule to create') def create_rule(path, config, required_only, rule_type): """Create a detection rule.""" contents = load_rule_contents(config, single_only=True)[0] if config else {} return rule_prompt(path, rule_type=rule_type, required_only=required_only, save=True, **contents) @root.command('generate-rules-index') @click.option('--query', '-q', help='Optional KQL query to limit to specific rules') @click.option('--overwrite', is_flag=True, help='Overwrite files in an existing folder') @click.pass_context def generate_rules_index(ctx: click.Context, query, overwrite, save_files=True): """Generate enriched indexes of rules, based on a KQL search, for indexing/importing into elasticsearch/kibana.""" from .packaging import Package if query: rule_paths = [r['file'] for r in ctx.invoke(search_rules, query=query, verbose=False)] rules = RuleCollection() rules.load_files(Path(p) for p in rule_paths) else: rules = RuleCollection.default() rule_count = len(rules) package = Package(rules, name=load_current_package_version(), verbose=False) package_hash = package.get_package_hash() bulk_upload_docs, importable_rules_docs = package.create_bulk_index_body() if save_files: path = get_path('enriched-rule-indexes', package_hash) path.mkdir(parents=True, exist_ok=overwrite) bulk_upload_docs.dump(path.joinpath('enriched-rules-index-uploadable.ndjson'), sort_keys=True) importable_rules_docs.dump(path.joinpath('enriched-rules-index-importable.ndjson'), sort_keys=True) click.echo(f'files saved to: {path}') click.echo(f'{rule_count} rules included') return bulk_upload_docs, importable_rules_docs @root.command("import-rules-to-repo") @click.argument("input-file", type=click.Path(dir_okay=False, exists=True), nargs=-1, required=False) @click.option("--action-connector-import", "-ac", is_flag=True, help="Include action connectors in export") @click.option("--exceptions-import", "-e", is_flag=True, help="Include exceptions in export") @click.option("--required-only", is_flag=True, help="Only prompt for required fields") @click.option("--directory", "-d", type=click.Path(file_okay=False, exists=True), help="Load files from a directory") @click.option( "--save-directory", "-s", type=click.Path(file_okay=False, exists=True), help="Save imported rules to a directory" ) @click.option( "--exceptions-directory", "-se", type=click.Path(file_okay=False, exists=True), help="Save imported exceptions to a directory", ) @click.option( "--action-connectors-directory", "-sa", type=click.Path(file_okay=False, exists=True), help="Save imported actions to a directory", ) @click.option("--skip-errors", "-ske", is_flag=True, help="Skip rule import errors") @click.option("--default-author", "-da", type=str, required=False, help="Default author for rules missing one") @click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule") def import_rules_into_repo(input_file: click.Path, required_only: bool, action_connector_import: bool, exceptions_import: bool, directory: click.Path, save_directory: click.Path, action_connectors_directory: click.Path, exceptions_directory: click.Path, skip_errors: bool, default_author: str, strip_none_values: bool): """Import rules from json, toml, or yaml files containing Kibana exported rule(s).""" errors = [] rule_files = glob.glob(os.path.join(directory, "**", "*.*"), recursive=True) if directory else [] rule_files = sorted(set(rule_files + list(input_file))) file_contents = [] for rule_file in rule_files: file_contents.extend(load_rule_contents(Path(rule_file))) if not file_contents: click.echo("Must specify at least one file!") exceptions_containers = {} exceptions_items = {} exceptions_containers, exceptions_items, _, unparsed_results = parse_exceptions_results_from_api(file_contents) action_connectors, unparsed_results = parse_action_connector_results_from_api(unparsed_results) file_contents = unparsed_results exception_list_rule_table = {} action_connector_rule_table = {} rule_count = 0 for contents in file_contents: # Don't load exceptions as rules if contents.get("type") not in get_args(definitions.RuleType): click.echo(f"Skipping - {contents.get("type")} is not a supported rule type") continue base_path = contents.get("name") or contents.get("rule", {}).get("name") base_path = rulename_to_filename(base_path) if base_path else base_path if base_path is None: raise ValueError(f"Invalid rule file, please ensure the rule has a name field: {contents}") rule_path = os.path.join(save_directory if save_directory is not None else RULES_DIRS[0], base_path) # handle both rule json formats loaded from kibana and toml data_view_id = contents.get("data_view_id") or contents.get("rule", {}).get("data_view_id") additional = ["index"] if not data_view_id else ["data_view_id"] # Use additional to store all available fields for the rule additional += [key for key in contents if key not in additional and contents.get(key, None)] # use default author if not provided contents["author"] = contents.get("author") or default_author or [contents.get("created_by")] if isinstance(contents["author"], str): contents["author"] = [contents["author"]] output = rule_prompt( rule_path, required_only=required_only, save=True, verbose=True, additional_required=additional, skip_errors=skip_errors, strip_none_values=strip_none_values, **contents, ) # If output is not a TOMLRule if isinstance(output, str): errors.append(output) else: rule_count += 1 if contents.get("exceptions_list"): # For each item in rule.contents.data.exceptions_list to the exception_list_rule_table under the list_id for exception in contents["exceptions_list"]: exception_id = exception["list_id"] if exception_id not in exception_list_rule_table: exception_list_rule_table[exception_id] = [] exception_list_rule_table[exception_id].append({"id": contents["id"], "name": contents["name"]}) if contents.get("actions"): # If rule has actions with connectors, add them to the action_connector_rule_table under the action_id for action in contents["actions"]: action_id = action["id"] if action_id not in action_connector_rule_table: action_connector_rule_table[action_id] = [] action_connector_rule_table[action_id].append({"id": contents["id"], "name": contents["name"]}) # Build TOMLException Objects if exceptions_import: _, e_output, e_errors = build_exception_objects( exceptions_containers, exceptions_items, exception_list_rule_table, exceptions_directory, save_toml=True, skip_errors=skip_errors, verbose=True, ) for line in e_output: click.echo(line) errors.extend(e_errors) # Build TOMLActionConnector Objects if action_connector_import: _, ac_output, ac_errors = build_action_connector_objects( action_connectors, action_connector_rule_table, action_connectors_directory, save_toml=True, skip_errors=skip_errors, verbose=True, ) for line in ac_output: click.echo(line) errors.extend(ac_errors) exceptions_count = 0 if not exceptions_import else len(exceptions_containers) + len(exceptions_items) click.echo(f"{rule_count + exceptions_count + len(action_connectors)} results exported") click.echo(f"{rule_count} rules converted") click.echo(f"{exceptions_count} exceptions exported") click.echo(f"{len(action_connectors)} actions connectors exported") if errors: err_file = save_directory if save_directory is not None else RULES_DIRS[0] / "_errors.txt" err_file.write_text("\n".join(errors)) click.echo(f"{len(errors)} errors saved to {err_file}") @root.command('build-limited-rules') @click.option('--stack-version', type=click.Choice(all_versions()), required=True, help='Version to downgrade to be compatible with the older instance of Kibana') @click.option('--output-file', '-o', type=click.Path(dir_okay=False, exists=False), required=True) def build_limited_rules(stack_version: str, output_file: str): """ Import rules from json, toml, or Kibana exported rule file(s), filter out unsupported ones, and write to output NDJSON file. """ # Schema generation and incompatible fields detection query_rule_data = class_schema(QueryRuleData)() fields = getattr(query_rule_data, 'fields', {}) incompatible_fields = get_incompatible_fields(list(fields.values()), Version.parse(stack_version, optional_minor_and_patch=True)) # Load all rules rules = RuleCollection.default() # Define output path output_path = Path(output_file) # Define ndjson instance for output ndjson_output = Ndjson() # Get API schema for rule type api_schema = get_schema_file(stack_version, "base")["properties"]["type"]["enum"] # Function to process each rule def process_rule(rule, incompatible_fields: List[str]): if rule.contents.type not in api_schema: click.secho(f'{rule.contents.name} - Skipping unsupported rule type: {rule.contents.get("type")}', fg='yellow') return None # Remove unsupported fields from rule rule_contents = rule.contents.to_api_format() for field in incompatible_fields: rule_contents.pop(field, None) return rule_contents # Process each rule and add to ndjson_output for rule in rules.rules: processed_rule = process_rule(rule, incompatible_fields) if processed_rule is not None: ndjson_output.append(processed_rule) # Write ndjson_output to file ndjson_output.dump(output_path) click.echo(f'Success: Rules written to {output_file}') @root.command('toml-lint') @click.option('--rule-file', '-f', multiple=True, type=click.Path(exists=True), help='Specify one or more rule files.') def toml_lint(rule_file): """Cleanup files with some simple toml formatting.""" if rule_file: rules = RuleCollection() rules.load_files(Path(p) for p in rule_file) else: rules = RuleCollection.default() # re-save the rules to force TOML reformatting for rule in rules: rule.save_toml() click.echo('TOML file linting complete') @root.command('mass-update') @click.argument('query') @click.option('--metadata', '-m', is_flag=True, help='Make an update to the rule metadata rather than contents.') @click.option('--language', type=click.Choice(["eql", "kql"]), default="kql") @click.option('--field', type=(str, str), multiple=True, help='Use rule-search to retrieve a subset of rules and modify values ' '(ex: --field management.ecs_version 1.1.1).\n' 'Note this is limited to string fields only. Nested fields should use dot notation.') @click.pass_context def mass_update(ctx, query, metadata, language, field): """Update multiple rules based on eql results.""" rules = RuleCollection().default() results = ctx.invoke(search_rules, query=query, language=language, verbose=False) matching_ids = set(r["rule_id"] for r in results) rules = rules.filter(lambda r: r.id in matching_ids) for rule in rules: for key, value in field: nested_set(rule.metadata if metadata else rule.contents, key, value) rule.validate(as_rule=True) rule.save(as_rule=True) return ctx.invoke(search_rules, query=query, language=language, columns=['rule_id', 'name'] + [k[0].split('.')[-1] for k in field]) @root.command('view-rule') @click.argument('rule-file', type=Path) @click.option('--api-format/--rule-format', default=True, help='Print the rule in final api or rule format') @click.pass_context def view_rule(ctx, rule_file, api_format): """View an internal rule or specified rule file.""" rule = RuleCollection().load_file(rule_file) if api_format: click.echo(json.dumps(rule.contents.to_api_format(), indent=2, sort_keys=True)) else: click.echo(toml_write(rule.contents.to_dict())) return rule def _export_rules( rules: RuleCollection, outfile: Path, downgrade_version: Optional[definitions.SemVer] = None, verbose=True, skip_unsupported=False, include_metadata: bool = False, include_action_connectors: bool = False, include_exceptions: bool = False, ): """Export rules and exceptions into a consolidated ndjson file.""" from .rule import downgrade_contents_from_rule outfile = outfile.with_suffix('.ndjson') unsupported = [] if downgrade_version: if skip_unsupported: output_lines = [] for rule in rules: try: output_lines.append(json.dumps(downgrade_contents_from_rule(rule, downgrade_version, include_metadata=include_metadata), sort_keys=True)) except ValueError as e: unsupported.append(f'{e}: {rule.id} - {rule.name}') continue else: output_lines = [json.dumps(downgrade_contents_from_rule(r, downgrade_version, include_metadata=include_metadata), sort_keys=True) for r in rules] else: output_lines = [json.dumps(r.contents.to_api_format(include_metadata=include_metadata), sort_keys=True) for r in rules] # Add exceptions to api format here and add to output_lines if include_exceptions or include_action_connectors: cl = GenericCollection.default() # Get exceptions in API format if include_exceptions: exceptions = [d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLExceptionContents)] exceptions = [e for sublist in exceptions for e in sublist] output_lines.extend(json.dumps(e, sort_keys=True) for e in exceptions) if include_action_connectors: action_connectors = [ d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLActionConnectorContents) ] actions = [a for sublist in action_connectors for a in sublist] output_lines.extend(json.dumps(a, sort_keys=True) for a in actions) outfile.write_text('\n'.join(output_lines) + '\n') if verbose: click.echo(f'Exported {len(rules) - len(unsupported)} rules into {outfile}') if skip_unsupported and unsupported: unsupported_str = '\n- '.join(unsupported) click.echo(f'Skipped {len(unsupported)} unsupported rules: \n- {unsupported_str}') @root.command("export-rules-from-repo") @multi_collection @click.option( "--outfile", "-o", default=Path(get_path("exports", f'{time.strftime("%Y%m%dT%H%M%SL")}.ndjson')), type=Path, help="Name of file for exported rules", ) @click.option("--replace-id", "-r", is_flag=True, help="Replace rule IDs with new IDs before export") @click.option( "--stack-version", type=click.Choice(all_versions()), help="Downgrade a rule version to be compatible with older instances of Kibana", ) @click.option( "--skip-unsupported", "-s", is_flag=True, help="If `--stack-version` is passed, skip rule types which are unsupported " "(an error will be raised otherwise)", ) @click.option("--include-metadata", type=bool, is_flag=True, default=False, help="Add metadata to the exported rules") @click.option( "--include-action-connectors", "-ac", type=bool, is_flag=True, default=False, help="Include Action Connectors in export", ) @click.option( "--include-exceptions", "-e", type=bool, is_flag=True, default=False, help="Include Exceptions Lists in export" ) def export_rules_from_repo(rules, outfile: Path, replace_id, stack_version, skip_unsupported, include_metadata: bool, include_action_connectors: bool, include_exceptions: bool) -> RuleCollection: """Export rule(s) and exception(s) into an importable ndjson file.""" assert len(rules) > 0, "No rules found" if replace_id: # if we need to replace the id, take each rule object and create a copy # of it, with only the rule_id field changed old_rules = rules rules = RuleCollection() for rule in old_rules: new_data = dataclasses.replace(rule.contents.data, rule_id=str(uuid4())) new_contents = dataclasses.replace(rule.contents, data=new_data) rules.add_rule(TOMLRule(contents=new_contents)) outfile.parent.mkdir(exist_ok=True) _export_rules( rules=rules, outfile=outfile, downgrade_version=stack_version, skip_unsupported=skip_unsupported, include_metadata=include_metadata, include_action_connectors=include_action_connectors, include_exceptions=include_exceptions, ) return rules @root.command('validate-rule') @click.argument('path') @click.pass_context def validate_rule(ctx, path): """Check if a rule staged in rules dir validates against a schema.""" rule = RuleCollection().load_file(Path(path)) click.echo('Rule validation successful') return rule @root.command('validate-all') def validate_all(): """Check if all rules validates against a schema.""" RuleCollection.default() click.echo('Rule validation successful') @root.command('rule-search') @click.argument('query', required=False) @click.option('--columns', '-c', multiple=True, help='Specify columns to add the table') @click.option('--language', type=click.Choice(["eql", "kql"]), default="kql") @click.option('--count', is_flag=True, help='Return a count rather than table') def search_rules(query, columns, language, count, verbose=True, rules: Dict[str, TOMLRule] = None, pager=False): """Use KQL or EQL to find matching rules.""" from kql import get_evaluator from eql.table import Table from eql.build import get_engine from eql import parse_query from eql.pipes import CountPipe from .rule import get_unique_query_fields flattened_rules = [] rules = rules or {str(rule.path): rule for rule in RuleCollection.default()} for file_name, rule in rules.items(): flat: dict = {"file": os.path.relpath(file_name)} flat.update(rule.contents.to_dict()) flat.update(flat["metadata"]) flat.update(flat["rule"]) tactic_names = [] technique_ids = [] subtechnique_ids = [] for entry in flat['rule'].get('threat', []): if entry["framework"] != "MITRE ATT&CK": continue techniques = entry.get('technique', []) tactic_names.append(entry['tactic']['name']) technique_ids.extend([t['id'] for t in techniques]) subtechnique_ids.extend([st['id'] for t in techniques for st in t.get('subtechnique', [])]) flat.update(techniques=technique_ids, tactics=tactic_names, subtechniques=subtechnique_ids, unique_fields=get_unique_query_fields(rule)) flattened_rules.append(flat) flattened_rules.sort(key=lambda dct: dct["name"]) filtered = [] if language == "kql": evaluator = get_evaluator(query) if query else lambda x: True filtered = list(filter(evaluator, flattened_rules)) elif language == "eql": parsed = parse_query(query, implied_any=True, implied_base=True) evaluator = get_engine(parsed) filtered = [result.events[0].data for result in evaluator(flattened_rules)] if not columns and any(isinstance(pipe, CountPipe) for pipe in parsed.pipes): columns = ["key", "count", "percent"] if count: click.echo(f'{len(filtered)} rules') return filtered if columns: columns = ",".join(columns).split(",") else: columns = ["rule_id", "file", "name"] table = Table.from_list(columns, filtered) if verbose: click.echo_via_pager(table) if pager else click.echo(table) return filtered @root.command('build-threat-map-entry') @click.argument('tactic') @click.argument('technique-ids', nargs=-1) def build_threat_map(tactic: str, technique_ids: Iterable[str]): """Build a threat map entry.""" entry = build_threat_map_entry(tactic, *technique_ids) rendered = pytoml.dumps({'rule': {'threat': [entry]}}) # strip out [rule] cleaned = '\n'.join(rendered.splitlines()[2:]) print(cleaned) return entry @root.command("test") @click.pass_context def test_rules(ctx): """Run unit tests over all of the rules.""" import pytest rules_config = ctx.obj['rules_config'] test_config = rules_config.test_config tests, skipped = test_config.get_test_names(formatted=True) if skipped: click.echo(f'Tests skipped per config ({len(skipped)}):') click.echo('\n'.join(skipped)) clear_caches() if tests: ctx.exit(pytest.main(['-v'] + tests)) else: click.echo('No tests found to execute!') @root.group('typosquat') def typosquat_group(): """Commands for generating typosquat detections.""" @typosquat_group.command('create-dnstwist-index') @click.argument('input-file', type=click.Path(exists=True, dir_okay=False), required=True) @click.pass_context @add_client('elasticsearch', add_func_arg=False) def create_dnstwist_index(ctx: click.Context, input_file: click.Path): """Create a dnstwist index in Elasticsearch to work with a threat match rule.""" from elasticsearch import Elasticsearch es_client: Elasticsearch = ctx.obj['es'] click.echo(f'Attempting to load dnstwist data from {input_file}') dnstwist_data: dict = load_dump(str(input_file)) click.echo(f'{len(dnstwist_data)} records loaded') original_domain = next(r['domain-name'] for r in dnstwist_data if r.get('fuzzer', '') == 'original*') click.echo(f'Original domain name identified: {original_domain}') domain = original_domain.split('.')[0] domain_index = f'dnstwist-{domain}' # If index already exists, prompt user to confirm if they want to overwrite if es_client.indices.exists(index=domain_index): if click.confirm( f"dnstwist index: {domain_index} already exists for {original_domain}. Do you want to overwrite?", abort=True): es_client.indices.delete(index=domain_index) fields = [ "dns-a", "dns-aaaa", "dns-mx", "dns-ns", "banner-http", "fuzzer", "original-domain", "dns.question.registered_domain" ] timestamp_field = "@timestamp" mappings = {"mappings": {"properties": {f: {"type": "keyword"} for f in fields}}} mappings["mappings"]["properties"][timestamp_field] = {"type": "date"} es_client.indices.create(index=domain_index, body=mappings) # handle dns.question.registered_domain separately fields.pop() es_updates = [] now = datetime.utcnow() for item in dnstwist_data: if item['fuzzer'] == 'original*': continue record = item.copy() record.setdefault('dns', {}).setdefault('question', {}).setdefault('registered_domain', item.get('domain-name')) for field in fields: record.setdefault(field, None) record['@timestamp'] = now es_updates.extend([{'create': {'_index': domain_index}}, record]) click.echo(f'Indexing data for domain {original_domain}') results = es_client.bulk(body=es_updates) if results['errors']: error = {r['create']['result'] for r in results['items'] if r['create']['status'] != 201} client_error(f'Errors occurred during indexing:\n{error}') click.echo(f'{len(results["items"])} watchlist domains added to index') click.echo('Run `prep-rule` and import to Kibana to create alerts on this index') @typosquat_group.command('prep-rule') @click.argument('author') def prep_rule(author: str): """Prep the detection threat match rule for dnstwist data with a rule_id and author.""" rule_template_file = get_etc_path('rule_template_typosquatting_domain.json') template_rule = json.loads(rule_template_file.read_text()) template_rule.update(author=[author], rule_id=str(uuid4())) updated_rule = get_path('rule_typosquatting_domain.ndjson') updated_rule.write_text(json.dumps(template_rule, sort_keys=True)) click.echo(f'Rule saved to: {updated_rule}. Import this to Kibana to create alerts on all dnstwist-* indexes') click.echo('Note: you only need to import and enable this rule one time for all dnstwist-* indexes')