cobalt_strike/cobalt_strike.py (144 lines of code) (raw):
import logging
import pprint
from typing import List, cast
from urllib.parse import urlparse, urlunparse
import malduck
from libcsce.error import ConfigNotFoundError, MissingDataSectionError
from libcsce.parser import CobaltStrikeConfigParser
from malduck.extractor import Extractor
from malduck.procmem import ProcessMemoryPE
from malduck.yara import YaraRuleMatch
from ..utils import Config, get_rule_metadata
logger = logging.getLogger(__name__)
class CobaltStrike(Extractor):
family: str = "cobalt_strike"
yara_rules = ("cobalt_strike",)
overrides = []
info: Config = {}
@Extractor.needs_pe
@Extractor.rule
def cobalt_strike(self, p: ProcessMemoryPE, match: YaraRuleMatch) -> Config | bool:
"""
:param p: ProcessMemory object that contains matched file/dump representation
:return: config
"""
_info: Config = get_rule_metadata(match)
_b = p.store()
_beacon = CobaltStrikeConfigParser(_b, 4)
try:
_parsed_config: Config = cast(Config, _beacon.parse_config())
except ConfigNotFoundError:
logger.info("Sample did not contain a CobaltStrike config")
return {}
except MissingDataSectionError:
logger.info("CobaltStrike sample did not contain a .data section")
return {}
_proc_inject = dict(_parsed_config["process-inject"])
if "stub" in _proc_inject:
_proc_inject["stub"] = cast(bytes, _proc_inject["stub"]).hex()
beacons: List[str] = _parsed_config["beacontype"]
# Populate general-purpose config first
_config = {
"family": self.family,
"urls": [],
self.family: {
"beacon_type": beacons,
"sleep_time": _parsed_config["sleeptime"],
"jitter": _parsed_config["jitter"],
"max_get_size": _parsed_config["maxgetsize"],
"spawn_to": cast(bytes, _parsed_config["spawnto"]).hex(),
"license_id": _parsed_config["license_id"],
"cfg_caution": _parsed_config["cfg_caution"],
"kill_date": _parsed_config["kill_date"],
"crypto_scheme": _parsed_config["crypto_scheme"],
"post_exploitation": _parsed_config["post-ex"],
"stage": _parsed_config["stage"],
"proxy": _parsed_config["proxy"],
"process_inject": _proc_inject,
# HTTP/S and Hybrid Beacon Settings
"http": {
"server": {
"hostname": _parsed_config["server"]["hostname"],
"port": _parsed_config["server"]["port"],
"public_key": _parsed_config["server"]["publickey"].hex(),
},
"get": _parsed_config["http-get"],
"post": _parsed_config["http-post"],
"post_chunk": _parsed_config["http_post_chunk"],
"host_header": _parsed_config["host_header"],
"user_agent": _parsed_config["useragent_header"],
"uses_cookies": _parsed_config["uses_cookies"],
},
# DNS Beacon settings
"dns": {
"dns_idle": _parsed_config["dns-beacon"]["dns_idle"],
"dns_sleep": _parsed_config["dns-beacon"]["dns_sleep"],
"max_dns": _parsed_config["dns-beacon"]["maxdns"],
"beacon": _parsed_config["dns-beacon"]["beacon"],
"get_A": _parsed_config["dns-beacon"]["get_A"],
"get_AAAA": _parsed_config["dns-beacon"]["get_AAAA"],
"get_TXT": _parsed_config["dns-beacon"]["get_TXT"],
"put_metadata": _parsed_config["dns-beacon"]["put_metadata"],
"put_output": _parsed_config["dns-beacon"]["put_output"],
},
# SMB Beacon settings
"smb": {
"frame_header": cast(
bytes, _parsed_config["smb_frame_header"]
).hex()
if _parsed_config["smb_frame_header"]
else None,
"pipe_name": _parsed_config["pipename"],
},
# SSH Client settings
"ssh": {
"hostname": _parsed_config["ssh"]["hostname"],
"port": _parsed_config["ssh"]["port"],
"username": _parsed_config["ssh"]["username"],
"password": _parsed_config["ssh"]["password"],
"privatekey": _parsed_config["ssh"]["hostname"],
},
# TCP Options
"tcp_frame_header": malduck.enhex(
_parsed_config["tcp_frame_header"]
).decode("utf-8"),
},
}
schemes = {"http", "https"} & set(beacons)
if schemes:
for scheme in schemes:
netloc = "%s:%s" % (
_parsed_config[self.family]["server"]["hostname"],
_parsed_config[self.family]["server"]["port"],
)
# (scheme, network location, path, query, fragment).
_config["urls"] += urlunparse(
(
scheme.lower(),
netloc,
_config[self.family]["http"]["get"]["uri"],
"",
"",
"",
)
)
_config["urls"] += urlunparse(
(
scheme.lower(),
netloc,
_config[self.family]["http"]["post"]["uri"],
"",
"",
"",
)
)
# This can be processed by the useragent ingest processor
_config["user_agent"] = {"original": _parsed_config["useragent_header"]}
smb_hostname = None
if "smb" in beacons:
_parts = urlparse(
_parsed_config[self.family]["smb"]["pipe_name"].replace("\\", "/")
)._asdict()
_parts["scheme"] = "smb"
_config["urls"] = urlunparse(_parts.values())
# Parse SMB hostname if not '.'
smb_hostname = _parts["netloc"].split(":")[0]
if smb_hostname == ".":
smb_hostname = None
_config["hostname"] = []
if _parsed_config["server"]["hostname"]:
_config["hostname"].append(_parsed_config["server"]["hostname"])
if _parsed_config["ssh"]["hostname"]:
_config["hostname"].append(_parsed_config["ssh"]["hostname"])
if smb_hostname:
_config["hostname"].append(smb_hostname)
# Remove the URLs field if it's empty
if not _config["urls"]:
del _config["urls"]
return _config | _info