def _enhance_metadata()

in aristotle/aristotle.py [0:0]


    def _enhance_metadata(self):
        """ Enhance metadata on all the rules by adding additional key-value pairs based on the rule.
            Specifically:
                - 'flow' key-value pair
                - 'detection direction' key-value pair
                - CVE values found in rule (typically msg or references)
                - MITRE ATT&CK found in rule (typically extracted from references)
        """
        for sid in self.metadata_dict.keys():
            rule = self.metadata_dict[sid]['raw_rule']

            rule_match_obj = rule_re.match(rule)
            if not rule_match_obj:
                print_error("Invalid rule: '{}'".format(rule), fatal=True)

            # find CVE references (often in msg or reference) and add as metadata
            cves = cve_re_broad.findall(rule)
            if cves:
                for cve in cves:
                    self.add_metadata(sid, "cve", cve[4:])

            # find MITRE ATT&CK URL references, extract the values, and add as metadata
            mitres = mitre_attack_url_re.findall(rule)
            if mitres:
                for mitre in mitres:
                    val = mitre.split('/', 2)[-1].replace('/', '.')
                    self.add_metadata(sid, "mitre_attack", val)

            # find 'target' keyword and convert to 'hostile' BETTER key-value
            target_match_obj = target_keyword_re.search(rule)
            if target_match_obj:
                target = target_match_obj.group("TARGET")
                if target == "src_ip":
                    self.add_metadata(sid, "hostile", "dest_ip")
                elif target == "dest_ip":
                    self.add_metadata(sid, "hostile", "src_ip")
                else:
                    # shouldn't happen b/c Suricata will error on the rule on load
                    print_error("Keyword 'target' found in rule but with invalid value '{}'.".format(target))

            # get rule direction arrow ("->" or "<>")
            direction_arrow = rule_match_obj.group("DIRECTION")

            # get set of keywords (and modifiers, technically)
            keywords = rule_match_obj.group("BODY")
            keywords = list(set([k.split(':')[0].strip() for k in keywords.split(';') if len(k.strip()) > 1]))

            # get/add protocols
            proto = rule_match_obj.group("PROTO").lower().strip()
            self.add_metadata(sid, 'protocols', proto)
            match_obj = app_layer_protocol_re.search(rule)
            if match_obj:
                proto = match_obj.group("ALPROTO").lower().strip()
                if not proto.startswith('!') and proto != "failed":
                    self.add_metadata(sid, "protocols", proto)
            # check keywords known to be associated with particular protocols
            known_protocols = ['http', 'dns', 'tls', 'ssh', 'snmp', 'sip', 'rfb', 'mqtt', 'http2',
                               'ja3', 'dnp3', 'cip', 'enip', 'ftpdata', 'krb5', ]
            for app_proto in known_protocols:
                htest = [k for k in keywords if k.startswith("{}_".format(app_proto)) or k.startswith("{}.".format(app_proto))]
                if len(htest) > 0:
                    if app_proto == "ja3":
                        app_proto = "tls"
                    elif app_proto == "cip":
                        app_proto = "enip"
                    elif app_proto == "ftpdata":
                        app_proto = "ftp"
                    elif app_proto == "krb5":
                        app_proto = "kerberos"
                    self.add_metadata(sid, 'protocols', app_proto)

            # get flow
            match_obj = flow_re.search(rule)
            if match_obj:
                # normalize so direction is "to_client" or "to_server"
                flow_str = match_obj.group("FLOW").lower().replace("from_server", "to_client").replace("from_client", "to_server")
                flows = [f.strip() for f in flow_str.split(',')]
                direction_found = False
                for v in flows:
                    self.add_metadata(sid, 'flow', v)
                    if v.startswith("to_"):
                        direction_found = True
                if not direction_found:
                    # check keywords that force direction (request or response)
                    # This hits the most common ones; further checking could be done
                    # e.g. mqtt keywords.
                    request_keywords = ["http.uri", "http_uri", "http.uri.raw", "http_raw_uri",
                                        "http.method", "http_method", "http.request_line",
                                        "http_request_line", "http.request_body", "http_client_body",
                                        "http.user_agent", "http_user_agent", "http.host", "http_host",
                                        "http.host.raw", "http_raw_host", "http.accept", "http_accept",
                                        "http.accept_lang", "http_accept_lang", "http.accept_enc",
                                        "http_accept_enc", "http.referer", "http_referer", "http.connection",
                                        "http_connection", "dns.query", "dns_query", "ssh.hassh.string",
                                        "ja3.hash", "ja3.string", "ftpdata_command", "krb5_cname",
                                        "sip.method", "sip.uri", "sip.request_line"]
                    response_keywords = ["http.stat_msg", "http_stat_msg", "http.stat_code", "http_stat_code",
                                         "http.response_line", "http_response_line", "http.response_body",
                                         "http_server_body", "http.server", "http.location", "ssh.hassh.server",
                                         "ssh.hassh.server.string", "ja3s.hash", "ja3s.string", "krb5_sname",
                                         "sip.stat_code", "sip.stat_msg", "sip.response_line"]
                    matches = [k for k in keywords if k in request_keywords]
                    if len(matches) > 0:
                        self.add_metadata(sid, 'flow', 'to_server')
                    else:
                        matches = [k for k in keywords if k in response_keywords]
                        if len(matches) > 0:
                            self.add_metadata(sid, 'flow', 'to_client')
                        else:
                            print_debug("Flow direction could not be determined from 'flow' keyword for sid '{}'.".format(sid))
            else:
                print_debug("No 'flow' keyword found for SID '{}'.".format(sid))

            # calculate direction
            sip_val = rule_match_obj.group("SRCIP")
            dip_val = rule_match_obj.group("DSTIP")
            sip_reduced = self.reduce_ipval(sip_val)
            dip_reduced = self.reduce_ipval(dip_val)

            # print_debug("{}\n{}\n".format(sip_val, sip_reduced))
            # print_debug("{}\n{}\n".format(dip_val, dip_reduced))

            # self.metadata_dict[sid]['sip_reduced'] = sip_reduced
            # self.metadata_dict[sid]['dip_reduced'] = dip_reduced

            # calculate detection direction; possible values:
            # inbound, inbound-notexclusive, outbound, outbound-notexclusive,
            # internal, any, both, unknown
            if direction_arrow == "<>":
                detection_direction = "both"
            elif sip_reduced == "any" and dip_reduced == "$HOME_NET":
                detection_direction = "inbound-notexclusive"
            elif sip_reduced == "$HOME_NET" and dip_reduced == "$EXTERNAL_NET":
                detection_direction = "outbound"
            elif sip_reduced == "$HOME_NET" and dip_reduced == "any":
                detection_direction = "outbound-notexclusive"
            elif sip_reduced == "$HOME_NET" and dip_reduced == "$HOME_NET":
                detection_direction = "internal"
            # $EXTERNAL_NET -> $EXTERNAL_NET only going to be seen in spoofed traffic (not TCP); set it to OUTBOUND
            elif dip_reduced == "$EXTERNAL_NET":
                detection_direction = "outbound"
            elif sip_reduced == "$EXTERNAL_NET":
                detection_direction = "inbound"
            elif sip_reduced == "any" and dip_reduced == "any":
                detection_direction = "any"
            else:
                detection_direction = "unknown"
            self.add_metadata(sid, 'detection_direction', detection_direction)
        # TODO: remove duplicates?
        return