in cobalt_strike/cobalt_strike.py [0:0]
def cobalt_strike(self, p: ProcessMemoryPE, match: YaraRuleMatch) -> Config | bool:
"""
:param p: ProcessMemory object that contains matched file/dump representation
:return: config
"""
_info: Config = get_rule_metadata(match)
_b = p.store()
_beacon = CobaltStrikeConfigParser(_b, 4)
try:
_parsed_config: Config = cast(Config, _beacon.parse_config())
except ConfigNotFoundError:
logger.info("Sample did not contain a CobaltStrike config")
return {}
except MissingDataSectionError:
logger.info("CobaltStrike sample did not contain a .data section")
return {}
_proc_inject = dict(_parsed_config["process-inject"])
if "stub" in _proc_inject:
_proc_inject["stub"] = cast(bytes, _proc_inject["stub"]).hex()
beacons: List[str] = _parsed_config["beacontype"]
# Populate general-purpose config first
_config = {
"family": self.family,
"urls": [],
self.family: {
"beacon_type": beacons,
"sleep_time": _parsed_config["sleeptime"],
"jitter": _parsed_config["jitter"],
"max_get_size": _parsed_config["maxgetsize"],
"spawn_to": cast(bytes, _parsed_config["spawnto"]).hex(),
"license_id": _parsed_config["license_id"],
"cfg_caution": _parsed_config["cfg_caution"],
"kill_date": _parsed_config["kill_date"],
"crypto_scheme": _parsed_config["crypto_scheme"],
"post_exploitation": _parsed_config["post-ex"],
"stage": _parsed_config["stage"],
"proxy": _parsed_config["proxy"],
"process_inject": _proc_inject,
# HTTP/S and Hybrid Beacon Settings
"http": {
"server": {
"hostname": _parsed_config["server"]["hostname"],
"port": _parsed_config["server"]["port"],
"public_key": _parsed_config["server"]["publickey"].hex(),
},
"get": _parsed_config["http-get"],
"post": _parsed_config["http-post"],
"post_chunk": _parsed_config["http_post_chunk"],
"host_header": _parsed_config["host_header"],
"user_agent": _parsed_config["useragent_header"],
"uses_cookies": _parsed_config["uses_cookies"],
},
# DNS Beacon settings
"dns": {
"dns_idle": _parsed_config["dns-beacon"]["dns_idle"],
"dns_sleep": _parsed_config["dns-beacon"]["dns_sleep"],
"max_dns": _parsed_config["dns-beacon"]["maxdns"],
"beacon": _parsed_config["dns-beacon"]["beacon"],
"get_A": _parsed_config["dns-beacon"]["get_A"],
"get_AAAA": _parsed_config["dns-beacon"]["get_AAAA"],
"get_TXT": _parsed_config["dns-beacon"]["get_TXT"],
"put_metadata": _parsed_config["dns-beacon"]["put_metadata"],
"put_output": _parsed_config["dns-beacon"]["put_output"],
},
# SMB Beacon settings
"smb": {
"frame_header": cast(
bytes, _parsed_config["smb_frame_header"]
).hex()
if _parsed_config["smb_frame_header"]
else None,
"pipe_name": _parsed_config["pipename"],
},
# SSH Client settings
"ssh": {
"hostname": _parsed_config["ssh"]["hostname"],
"port": _parsed_config["ssh"]["port"],
"username": _parsed_config["ssh"]["username"],
"password": _parsed_config["ssh"]["password"],
"privatekey": _parsed_config["ssh"]["hostname"],
},
# TCP Options
"tcp_frame_header": malduck.enhex(
_parsed_config["tcp_frame_header"]
).decode("utf-8"),
},
}
schemes = {"http", "https"} & set(beacons)
if schemes:
for scheme in schemes:
netloc = "%s:%s" % (
_parsed_config[self.family]["server"]["hostname"],
_parsed_config[self.family]["server"]["port"],
)
# (scheme, network location, path, query, fragment).
_config["urls"] += urlunparse(
(
scheme.lower(),
netloc,
_config[self.family]["http"]["get"]["uri"],
"",
"",
"",
)
)
_config["urls"] += urlunparse(
(
scheme.lower(),
netloc,
_config[self.family]["http"]["post"]["uri"],
"",
"",
"",
)
)
# This can be processed by the useragent ingest processor
_config["user_agent"] = {"original": _parsed_config["useragent_header"]}
smb_hostname = None
if "smb" in beacons:
_parts = urlparse(
_parsed_config[self.family]["smb"]["pipe_name"].replace("\\", "/")
)._asdict()
_parts["scheme"] = "smb"
_config["urls"] = urlunparse(_parts.values())
# Parse SMB hostname if not '.'
smb_hostname = _parts["netloc"].split(":")[0]
if smb_hostname == ".":
smb_hostname = None
_config["hostname"] = []
if _parsed_config["server"]["hostname"]:
_config["hostname"].append(_parsed_config["server"]["hostname"])
if _parsed_config["ssh"]["hostname"]:
_config["hostname"].append(_parsed_config["ssh"]["hostname"])
if smb_hostname:
_config["hostname"].append(smb_hostname)
# Remove the URLs field if it's empty
if not _config["urls"]:
del _config["urls"]
return _config | _info