in detection_rules/devtools.py [0:0]
def update_attack_in_rules() -> List[Optional[TOMLRule]]:
"""Update threat mappings attack data in all rules."""
new_rules = []
redirected_techniques = attack.load_techniques_redirect()
today = time.strftime('%Y/%m/%d')
rules = RuleCollection.default()
for rule in rules.rules:
needs_update = False
valid_threat: List[ThreatMapping] = []
threat_pending_update = {}
threat = rule.contents.data.threat or []
for entry in threat:
tactic = entry.tactic.name
technique_ids = []
technique_names = []
for technique in entry.technique or []:
technique_ids.append(technique.id)
technique_names.append(technique.name)
technique_ids.extend([st.id for st in technique.subtechnique or []])
technique_names.extend([st.name for st in technique.subtechnique or []])
# check redirected techniques by ID
# redirected techniques are technique IDs that have changed but represent the same technique
if any([tid for tid in technique_ids if tid in redirected_techniques]):
needs_update = True
threat_pending_update[tactic] = technique_ids
click.echo(f"'{rule.contents.name}' requires update - technique ID change")
# check for name change
# happens if technique ID is the same but name changes
expected_technique_names = [attack.technique_lookup[str(tid)]["name"] for tid in technique_ids]
if any([tname for tname in technique_names if tname not in expected_technique_names]):
needs_update = True
threat_pending_update[tactic] = technique_ids
click.echo(f"'{rule.contents.name}' requires update - technique name change")
else:
valid_threat.append(entry)
if needs_update:
for tactic, techniques in threat_pending_update.items():
try:
updated_threat = attack.build_threat_map_entry(tactic, *techniques)
except ValueError as err:
raise ValueError(f'{rule.id} - {rule.name}: {err}')
tm = ThreatMapping.from_dict(updated_threat)
valid_threat.append(tm)
new_meta = dataclasses.replace(rule.contents.metadata, updated_date=today)
new_data = dataclasses.replace(rule.contents.data, threat=valid_threat)
new_contents = dataclasses.replace(rule.contents, data=new_data, metadata=new_meta)
new_rule = TOMLRule(contents=new_contents, path=rule.path)
new_rule.save_toml()
new_rules.append(new_rule)
if new_rules:
click.echo(f'\nFinished - {len(new_rules)} rules updated!')
else:
click.echo('No rule changes needed')
return new_rules