in aristotle/aristotle.py [0:0]
def _enhance_metadata(self):
""" Enhance metadata on all the rules by adding additional key-value pairs based on the rule.
Specifically:
- 'flow' key-value pair
- 'detection direction' key-value pair
- CVE values found in rule (typically msg or references)
- MITRE ATT&CK found in rule (typically extracted from references)
"""
for sid in self.metadata_dict.keys():
rule = self.metadata_dict[sid]['raw_rule']
rule_match_obj = rule_re.match(rule)
if not rule_match_obj:
print_error("Invalid rule: '{}'".format(rule), fatal=True)
# find CVE references (often in msg or reference) and add as metadata
cves = cve_re_broad.findall(rule)
if cves:
for cve in cves:
self.add_metadata(sid, "cve", cve[4:])
# find MITRE ATT&CK URL references, extract the values, and add as metadata
mitres = mitre_attack_url_re.findall(rule)
if mitres:
for mitre in mitres:
val = mitre.split('/', 2)[-1].replace('/', '.')
self.add_metadata(sid, "mitre_attack", val)
# find 'target' keyword and convert to 'hostile' BETTER key-value
target_match_obj = target_keyword_re.search(rule)
if target_match_obj:
target = target_match_obj.group("TARGET")
if target == "src_ip":
self.add_metadata(sid, "hostile", "dest_ip")
elif target == "dest_ip":
self.add_metadata(sid, "hostile", "src_ip")
else:
# shouldn't happen b/c Suricata will error on the rule on load
print_error("Keyword 'target' found in rule but with invalid value '{}'.".format(target))
# get rule direction arrow ("->" or "<>")
direction_arrow = rule_match_obj.group("DIRECTION")
# get set of keywords (and modifiers, technically)
keywords = rule_match_obj.group("BODY")
keywords = list(set([k.split(':')[0].strip() for k in keywords.split(';') if len(k.strip()) > 1]))
# get/add protocols
proto = rule_match_obj.group("PROTO").lower().strip()
self.add_metadata(sid, 'protocols', proto)
match_obj = app_layer_protocol_re.search(rule)
if match_obj:
proto = match_obj.group("ALPROTO").lower().strip()
if not proto.startswith('!') and proto != "failed":
self.add_metadata(sid, "protocols", proto)
# check keywords known to be associated with particular protocols
known_protocols = ['http', 'dns', 'tls', 'ssh', 'snmp', 'sip', 'rfb', 'mqtt', 'http2',
'ja3', 'dnp3', 'cip', 'enip', 'ftpdata', 'krb5', ]
for app_proto in known_protocols:
htest = [k for k in keywords if k.startswith("{}_".format(app_proto)) or k.startswith("{}.".format(app_proto))]
if len(htest) > 0:
if app_proto == "ja3":
app_proto = "tls"
elif app_proto == "cip":
app_proto = "enip"
elif app_proto == "ftpdata":
app_proto = "ftp"
elif app_proto == "krb5":
app_proto = "kerberos"
self.add_metadata(sid, 'protocols', app_proto)
# get flow
match_obj = flow_re.search(rule)
if match_obj:
# normalize so direction is "to_client" or "to_server"
flow_str = match_obj.group("FLOW").lower().replace("from_server", "to_client").replace("from_client", "to_server")
flows = [f.strip() for f in flow_str.split(',')]
direction_found = False
for v in flows:
self.add_metadata(sid, 'flow', v)
if v.startswith("to_"):
direction_found = True
if not direction_found:
# check keywords that force direction (request or response)
# This hits the most common ones; further checking could be done
# e.g. mqtt keywords.
request_keywords = ["http.uri", "http_uri", "http.uri.raw", "http_raw_uri",
"http.method", "http_method", "http.request_line",
"http_request_line", "http.request_body", "http_client_body",
"http.user_agent", "http_user_agent", "http.host", "http_host",
"http.host.raw", "http_raw_host", "http.accept", "http_accept",
"http.accept_lang", "http_accept_lang", "http.accept_enc",
"http_accept_enc", "http.referer", "http_referer", "http.connection",
"http_connection", "dns.query", "dns_query", "ssh.hassh.string",
"ja3.hash", "ja3.string", "ftpdata_command", "krb5_cname",
"sip.method", "sip.uri", "sip.request_line"]
response_keywords = ["http.stat_msg", "http_stat_msg", "http.stat_code", "http_stat_code",
"http.response_line", "http_response_line", "http.response_body",
"http_server_body", "http.server", "http.location", "ssh.hassh.server",
"ssh.hassh.server.string", "ja3s.hash", "ja3s.string", "krb5_sname",
"sip.stat_code", "sip.stat_msg", "sip.response_line"]
matches = [k for k in keywords if k in request_keywords]
if len(matches) > 0:
self.add_metadata(sid, 'flow', 'to_server')
else:
matches = [k for k in keywords if k in response_keywords]
if len(matches) > 0:
self.add_metadata(sid, 'flow', 'to_client')
else:
print_debug("Flow direction could not be determined from 'flow' keyword for sid '{}'.".format(sid))
else:
print_debug("No 'flow' keyword found for SID '{}'.".format(sid))
# calculate direction
sip_val = rule_match_obj.group("SRCIP")
dip_val = rule_match_obj.group("DSTIP")
sip_reduced = self.reduce_ipval(sip_val)
dip_reduced = self.reduce_ipval(dip_val)
# print_debug("{}\n{}\n".format(sip_val, sip_reduced))
# print_debug("{}\n{}\n".format(dip_val, dip_reduced))
# self.metadata_dict[sid]['sip_reduced'] = sip_reduced
# self.metadata_dict[sid]['dip_reduced'] = dip_reduced
# calculate detection direction; possible values:
# inbound, inbound-notexclusive, outbound, outbound-notexclusive,
# internal, any, both, unknown
if direction_arrow == "<>":
detection_direction = "both"
elif sip_reduced == "any" and dip_reduced == "$HOME_NET":
detection_direction = "inbound-notexclusive"
elif sip_reduced == "$HOME_NET" and dip_reduced == "$EXTERNAL_NET":
detection_direction = "outbound"
elif sip_reduced == "$HOME_NET" and dip_reduced == "any":
detection_direction = "outbound-notexclusive"
elif sip_reduced == "$HOME_NET" and dip_reduced == "$HOME_NET":
detection_direction = "internal"
# $EXTERNAL_NET -> $EXTERNAL_NET only going to be seen in spoofed traffic (not TCP); set it to OUTBOUND
elif dip_reduced == "$EXTERNAL_NET":
detection_direction = "outbound"
elif sip_reduced == "$EXTERNAL_NET":
detection_direction = "inbound"
elif sip_reduced == "any" and dip_reduced == "any":
detection_direction = "any"
else:
detection_direction = "unknown"
self.add_metadata(sid, 'detection_direction', detection_direction)
# TODO: remove duplicates?
return