dridex/loader/dridex_loader.py (117 lines of code) (raw):
import logging
from malduck import enhex, ipv4
from malduck.extractor import Extractor
from malduck.procmem import ProcessMemory
from malduck.yara import YaraRuleMatch
from ...utils import Config, get_rule_metadata
log = logging.getLogger(__name__)
__author__ = "c3rb3ru5"
__version__ = "1.0.0"
class DridexLoader(Extractor):
"""
DridexLoader Configuration Extractor
"""
family: str = "dridex"
yara_rules: tuple = ("dridex_loader",)
LEN_BLOB_KEY = 40
LEN_BOT_KEY = 107
botnet_rva = None
botnet_id = None
rc4_key = None
ip_count = 0
@Extractor.extractor("c2parse_6")
def c2parse_6(self, p: ProcessMemory, addr: int) -> None:
self.c2_rva = p.uint32v(addr + 44)
self.botnet_rva = p.uint32v(addr - 7)
self.delta = 0
@Extractor.extractor("c2parse_5")
def c2parse_5(self, p: ProcessMemory, addr: int) -> None:
self.c2_rva = p.uint32v(addr + 75)
self.botnet_rva = p.uint32v(addr + 3)
if self.botnet_rva:
self.botnet_id = p.uint16v(self.botnet_rva)
self.delta = 0
@Extractor.extractor("c2parse_4")
def c2parse_4(self, p: ProcessMemory, addr: int) -> None:
self.c2_rva = p.uint32v(addr + 6)
self.delta = 0
@Extractor.extractor("c2parse_3")
def c2parse_3(self, p: ProcessMemory, addr: int) -> None:
self.c2_rva = p.uint32v(addr + 60)
self.delta = 2
@Extractor.extractor("c2parse_2")
def c2parse_2(self, p: ProcessMemory, addr: int) -> None:
self.c2_rva = p.uint32v(addr + 47)
self.delta = 0
@Extractor.extractor("c2parse_1")
def c2parse_1(self, p: ProcessMemory, addr: int) -> None:
self.c2_rva = p.uint32v(addr + 27)
self.delta = 2
@Extractor.extractor("botnet_id")
def get_botnet_id(self, p: ProcessMemory, addr: int) -> None:
self.botnet_rva = p.uint32v(addr + 23)
if self.botnet_rva:
self.botnet_id = p.uint16v(self.botnet_rva)
def get_rc4_rva(self, p: ProcessMemory, rc4_decode: int) -> int | None:
zb = p.uint8v(rc4_decode + 8, True)
if zb:
return p.uint32v(rc4_decode + 5)
return p.uint32v(rc4_decode + 3)
@Extractor.extractor("rc4_key_1")
def rc4_key_1(self, p: ProcessMemory, addr: int) -> None:
self.rc4_key = self.get_rc4_rva(p, addr)
@Extractor.extractor("rc4_key_2")
def rc4_key_2(self, p: ProcessMemory, addr: int) -> None:
self.rc4_key = self.get_rc4_rva(p, addr)
@Extractor.extractor("ip_count_1")
def ip_count_1(self, p: ProcessMemory, addr: int) -> None:
ip_count_rva = p.uint32v(addr + 3)
if ip_count_rva:
self.ip_count = p.uint8v(ip_count_rva)
@Extractor.extractor("ip_count_2")
def ip_count_2(self, p: ProcessMemory, addr: int) -> None:
ip_count_rva = p.uint32v(addr + 2)
if ip_count_rva:
self.ip_count = p.uint8v(ip_count_rva)
@Extractor.extractor("ip_count_3")
def ip_count_3(self, p: ProcessMemory, addr: int) -> None:
ip_count_rva = p.uint32v(addr + 2)
if ip_count_rva:
self.ip_count = p.uint8v(ip_count_rva)
@staticmethod
def match_exists(matches, name):
for element in matches.elements:
if element == name:
return True
return False
@Extractor.rule
def dridex_loader(self, p: ProcessMemory, match: YaraRuleMatch) -> Config | bool:
_info: Config = get_rule_metadata(match)
return _info
@Extractor.needs_pe
@Extractor.final
def dridex_loader_final(self, p: ProcessMemory) -> dict | None:
if p.memory:
config = {
"family": self.family,
}
if not self.ip_count or self.ip_count > 10:
return None
log.debug("ip_count: %d", self.ip_count)
config[self.family] = {"hosts": []}
if self.c2_rva:
for i in range(0, self.ip_count):
ip = None
port = None
ip = ipv4(p.readv(self.c2_rva, 4))
port = p.uint16v(self.c2_rva + 4)
log.debug("found c2 ip: " + str(ip) + ":" + str(port))
if ip is not None and port is not None:
config[self.family]["hosts"].append(str(ip) + ":" + str(port))
self.c2_rva += 6 + self.delta
if len(config[self.family]["hosts"]) <= 0:
return None
if self.rc4_key:
config["rc4_key"] = enhex(self.rc4_key)
if self.botnet_id is not None:
log.debug("found botnet_id: " + str(self.botnet_id))
config[self.family]["botnet_id"] = self.botnet_id
return config
return None