def cobalt_strike()

in cobalt_strike/cobalt_strike.py [0:0]


    def cobalt_strike(self, p: ProcessMemoryPE, match: YaraRuleMatch) -> Config | bool:

        """
        :param p: ProcessMemory object that contains matched file/dump representation
        :return: config
        """

        _info: Config = get_rule_metadata(match)

        _b = p.store()

        _beacon = CobaltStrikeConfigParser(_b, 4)
        try:
            _parsed_config: Config = cast(Config, _beacon.parse_config())
        except ConfigNotFoundError:
            logger.info("Sample did not contain a CobaltStrike config")
            return {}
        except MissingDataSectionError:
            logger.info("CobaltStrike sample did not contain a .data section")
            return {}

        _proc_inject = dict(_parsed_config["process-inject"])
        if "stub" in _proc_inject:
            _proc_inject["stub"] = cast(bytes, _proc_inject["stub"]).hex()

        beacons: List[str] = _parsed_config["beacontype"]

        # Populate general-purpose config first
        _config = {
            "family": self.family,
            "urls": [],
            self.family: {
                "beacon_type": beacons,
                "sleep_time": _parsed_config["sleeptime"],
                "jitter": _parsed_config["jitter"],
                "max_get_size": _parsed_config["maxgetsize"],
                "spawn_to": cast(bytes, _parsed_config["spawnto"]).hex(),
                "license_id": _parsed_config["license_id"],
                "cfg_caution": _parsed_config["cfg_caution"],
                "kill_date": _parsed_config["kill_date"],
                "crypto_scheme": _parsed_config["crypto_scheme"],
                "post_exploitation": _parsed_config["post-ex"],
                "stage": _parsed_config["stage"],
                "proxy": _parsed_config["proxy"],
                "process_inject": _proc_inject,
                # HTTP/S and Hybrid Beacon Settings
                "http": {
                    "server": {
                        "hostname": _parsed_config["server"]["hostname"],
                        "port": _parsed_config["server"]["port"],
                        "public_key": _parsed_config["server"]["publickey"].hex(),
                    },
                    "get": _parsed_config["http-get"],
                    "post": _parsed_config["http-post"],
                    "post_chunk": _parsed_config["http_post_chunk"],
                    "host_header": _parsed_config["host_header"],
                    "user_agent": _parsed_config["useragent_header"],
                    "uses_cookies": _parsed_config["uses_cookies"],
                },
                # DNS Beacon settings
                "dns": {
                    "dns_idle": _parsed_config["dns-beacon"]["dns_idle"],
                    "dns_sleep": _parsed_config["dns-beacon"]["dns_sleep"],
                    "max_dns": _parsed_config["dns-beacon"]["maxdns"],
                    "beacon": _parsed_config["dns-beacon"]["beacon"],
                    "get_A": _parsed_config["dns-beacon"]["get_A"],
                    "get_AAAA": _parsed_config["dns-beacon"]["get_AAAA"],
                    "get_TXT": _parsed_config["dns-beacon"]["get_TXT"],
                    "put_metadata": _parsed_config["dns-beacon"]["put_metadata"],
                    "put_output": _parsed_config["dns-beacon"]["put_output"],
                },
                # SMB Beacon settings
                "smb": {
                    "frame_header": cast(
                        bytes, _parsed_config["smb_frame_header"]
                    ).hex()
                    if _parsed_config["smb_frame_header"]
                    else None,
                    "pipe_name": _parsed_config["pipename"],
                },
                # SSH Client settings
                "ssh": {
                    "hostname": _parsed_config["ssh"]["hostname"],
                    "port": _parsed_config["ssh"]["port"],
                    "username": _parsed_config["ssh"]["username"],
                    "password": _parsed_config["ssh"]["password"],
                    "privatekey": _parsed_config["ssh"]["hostname"],
                },
                # TCP Options
                "tcp_frame_header": malduck.enhex(
                    _parsed_config["tcp_frame_header"]
                ).decode("utf-8"),
            },
        }

        schemes = {"http", "https"} & set(beacons)
        if schemes:
            for scheme in schemes:
                netloc = "%s:%s" % (
                    _parsed_config[self.family]["server"]["hostname"],
                    _parsed_config[self.family]["server"]["port"],
                )

                # (scheme, network location, path, query, fragment).
                _config["urls"] += urlunparse(
                    (
                        scheme.lower(),
                        netloc,
                        _config[self.family]["http"]["get"]["uri"],
                        "",
                        "",
                        "",
                    )
                )
                _config["urls"] += urlunparse(
                    (
                        scheme.lower(),
                        netloc,
                        _config[self.family]["http"]["post"]["uri"],
                        "",
                        "",
                        "",
                    )
                )

            # This can be processed by the useragent ingest processor
            _config["user_agent"] = {"original": _parsed_config["useragent_header"]}

        smb_hostname = None
        if "smb" in beacons:
            _parts = urlparse(
                _parsed_config[self.family]["smb"]["pipe_name"].replace("\\", "/")
            )._asdict()
            _parts["scheme"] = "smb"
            _config["urls"] = urlunparse(_parts.values())

            # Parse SMB hostname if not '.'
            smb_hostname = _parts["netloc"].split(":")[0]
            if smb_hostname == ".":
                smb_hostname = None

        _config["hostname"] = []
        if _parsed_config["server"]["hostname"]:
            _config["hostname"].append(_parsed_config["server"]["hostname"])
        if _parsed_config["ssh"]["hostname"]:
            _config["hostname"].append(_parsed_config["ssh"]["hostname"])
        if smb_hostname:
            _config["hostname"].append(smb_hostname)

        # Remove the URLs field if it's empty
        if not _config["urls"]:
            del _config["urls"]

        return _config | _info