detection_rules/navigator.py (220 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Create summary documents for a rule package."""
from functools import reduce
from collections import defaultdict
from dataclasses import dataclass, field, fields
from pathlib import Path
from typing import Dict, List, Optional
from marshmallow import pre_load
import json
from .attack import CURRENT_ATTACK_VERSION
from .mixins import MarshmallowDataclassMixin
from .rule import TOMLRule
from .schemas import definitions
_DEFAULT_PLATFORMS = [
"Azure AD",
"Containers",
"Google Workspace",
"IaaS",
"Linux",
"macOS",
"Network",
"Office 365",
"PRE",
"SaaS",
"Windows"
]
_DEFAULT_NAVIGATOR_LINKS = {
"label": "repo",
"url": "https://github.com/elastic/detection-rules"
}
@dataclass
class NavigatorMetadata(MarshmallowDataclassMixin):
"""Metadata for ATT&CK navigator objects."""
name: str
value: str
@dataclass
class NavigatorLinks(MarshmallowDataclassMixin):
"""Metadata for ATT&CK navigator objects."""
label: str
url: str
@dataclass
class Techniques(MarshmallowDataclassMixin):
"""ATT&CK navigator techniques array class."""
techniqueID: str
tactic: str
score: int
metadata: List[NavigatorMetadata]
links: List[NavigatorLinks]
color: str = ''
comment: str = ''
enabled: bool = True
showSubtechniques: bool = False
@pre_load
def set_score(self, data: dict, **kwargs):
data['score'] = len(data['metadata'])
return data
@dataclass
class Navigator(MarshmallowDataclassMixin):
"""ATT&CK navigator class."""
@dataclass
class Versions:
attack: str
layer: str = '4.4'
navigator: str = '4.5.5'
@dataclass
class Filters:
platforms: list = field(default_factory=_DEFAULT_PLATFORMS.copy)
@dataclass
class Layout:
layout: str = 'side'
aggregateFunction: str = 'average'
showID: bool = True
showName: bool = True
showAggregateScores: bool = False
countUnscored: bool = False
@dataclass
class Gradient:
colors: list = field(default_factory=['#d3e0fa', '#0861fb'].copy)
minValue: int = 0
maxValue: int = 10
# not all defaults set
name: str
versions: Versions
techniques: List[Techniques]
# all defaults set
filters: Filters = fields(Filters)
layout: Layout = fields(Layout)
gradient: Gradient = fields(Gradient)
domain: str = 'enterprise-attack'
description: str = 'Elastic detection-rules coverage'
hideDisabled: bool = False
legendItems: list = field(default_factory=list)
links: List[NavigatorLinks] = field(default_factory=[_DEFAULT_NAVIGATOR_LINKS].copy)
metadata: Optional[List[NavigatorLinks]] = field(default_factory=list)
showTacticRowBackground: bool = False
selectTechniquesAcrossTactics: bool = False
selectSubtechniquesWithParent: bool = False
sorting: int = 0
tacticRowBackground: str = '#dddddd'
def technique_dict() -> dict:
return {'metadata': [], 'links': []}
class NavigatorBuilder:
"""Rule navigator mappings and management."""
def __init__(self, detection_rules: List[TOMLRule]):
self.detection_rules = detection_rules
self.layers = {
'all': defaultdict(lambda: defaultdict(technique_dict)),
'platforms': defaultdict(lambda: defaultdict(technique_dict)),
# these will build multiple layers
'indexes': defaultdict(lambda: defaultdict(lambda: defaultdict(technique_dict))),
'tags': defaultdict(lambda: defaultdict(lambda: defaultdict(technique_dict)))
}
self.process_rules()
@staticmethod
def meta_dict(name: str, value: any) -> dict:
meta = {
'name': name,
'value': value
}
return meta
@staticmethod
def links_dict(label: str, url: any) -> dict:
links = {
'label': label,
'url': url
}
return links
def rule_links_dict(self, rule: TOMLRule) -> dict:
"""Create a links dictionary for a rule."""
base_url = 'https://github.com/elastic/detection-rules/blob/main/rules/'
base_path = str(rule.get_base_rule_dir())
if base_path is None:
raise ValueError("Could not find a valid base path for the rule")
url = f'{base_url}{base_path}'
return self.links_dict(rule.name, url)
def get_layer(self, layer_name: str, layer_key: Optional[str] = None) -> dict:
"""Safely retrieve a layer with optional sub-keys."""
return self.layers[layer_name][layer_key] if layer_key else self.layers[layer_name]
def _update_all(self, rule: TOMLRule, tactic: str, technique_id: str):
value = f'{rule.contents.data.type}/{rule.contents.data.get("language")}'
self.add_rule_to_technique(rule, 'all', tactic, technique_id, value)
def _update_platforms(self, rule: TOMLRule, tactic: str, technique_id: str):
value = rule.path.parent.name
self.add_rule_to_technique(rule, 'platforms', tactic, technique_id, value)
def _update_indexes(self, rule: TOMLRule, tactic: str, technique_id: str):
for index in rule.contents.data.get('index') or []:
value = rule.id
self.add_rule_to_technique(rule, 'indexes', tactic, technique_id, value, layer_key=index.lower())
def _update_tags(self, rule: TOMLRule, tactic: str, technique_id: str):
for tag in rule.contents.data.get('tags', []):
value = rule.id
expected_prefixes = set([tag.split(":")[0] + ":" for tag in definitions.EXPECTED_RULE_TAGS])
tag = reduce(lambda s, substr: s.replace(substr, ''), expected_prefixes, tag).lstrip()
layer_key = tag.replace(' ', '-').lower()
self.add_rule_to_technique(rule, 'tags', tactic, technique_id, value, layer_key=layer_key)
def add_rule_to_technique(self,
rule: TOMLRule,
layer_name: str,
tactic: str,
technique_id: str,
value: str,
layer_key: Optional[str] = None):
"""Add a rule to a technique metadata and links."""
layer = self.get_layer(layer_name, layer_key)
layer[tactic][technique_id]['metadata'].append(self.meta_dict(rule.name, value))
layer[tactic][technique_id]['links'].append(self.rule_links_dict(rule))
def process_rule(self, rule: TOMLRule, tactic: str, technique_id: str):
self._update_all(rule, tactic, technique_id)
self._update_platforms(rule, tactic, technique_id)
self._update_indexes(rule, tactic, technique_id)
self._update_tags(rule, tactic, technique_id)
def process_rules(self):
"""Adds rule to each applicable layer, including multi-layers."""
for rule in self.detection_rules:
threat = rule.contents.data.threat
if threat:
for entry in threat:
tactic = entry.tactic.name.lower()
if entry.technique:
for technique_entry in entry.technique:
technique_id = technique_entry.id
self.process_rule(rule, tactic, technique_id)
if technique_entry.subtechnique:
for sub in technique_entry.subtechnique:
self.process_rule(rule, tactic, sub.id)
def build_navigator(self, layer_name: str, layer_key: Optional[str] = None) -> Navigator:
populated_techniques = []
layer = self.get_layer(layer_name, layer_key)
base_name = f'{layer_name}-{layer_key}' if layer_key else layer_name
base_name = base_name.replace('*', 'WILDCARD')
name = f'Elastic-detection-rules-{base_name}'
for tactic, techniques in layer.items():
tactic_normalized = '-'.join(tactic.lower().split())
for technique_id, rules_data in techniques.items():
rules_data.update(tactic=tactic_normalized, techniqueID=technique_id)
techniques = Techniques.from_dict(rules_data)
populated_techniques.append(techniques.to_dict())
base_nav_obj = {
'name': name,
'techniques': populated_techniques,
'versions': {'attack': CURRENT_ATTACK_VERSION}
}
navigator = Navigator.from_dict(base_nav_obj)
return navigator
def build_all(self) -> List[Navigator]:
built = []
for layer_name, data in self.layers.items():
# this is a single layer
if 'defense evasion' in data:
built.append(self.build_navigator(layer_name))
else:
# multi layers
for layer_key, sub_data in data.items():
built.append(self.build_navigator(layer_name, layer_key))
return built
@staticmethod
def _save(built: Navigator, directory: Path, verbose=True) -> Path:
path = directory.joinpath(built.name).with_suffix('.json')
path.write_text(json.dumps(built.to_dict(), indent=2))
if verbose:
print(f'saved: {path}')
return path
def save_layer(self,
layer_name: str,
directory: Path,
layer_key: Optional[str] = None,
verbose=True
) -> (Path, dict):
built = self.build_navigator(layer_name, layer_key)
return self._save(built, directory, verbose), built
def save_all(self, directory: Path, verbose=True) -> Dict[Path, Navigator]:
paths = {}
for built in self.build_all():
path = self._save(built, directory, verbose)
paths[path] = built
return paths