asyncrat/asyncrat.py (100 lines of code) (raw):
import base64
import logging
import string
from typing import cast
import requests
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
from malduck import UInt32
from malduck.extractor import Extractor
from malduck.procmem import ProcessMemory
from malduck.yara import YaraRuleMatch
from ..utils import Config, get_rule_metadata
log = logging.getLogger(__name__)
__author__ = "c3rb3ru5"
__version__ = "1.0.0"
class ASyncRAT(Extractor):
"""
ASyncRAT Configuration Extractor
"""
family: str = "asyncrat"
yara_rules: tuple = ("asyncrat",)
AES_BLOCK_SIZE = 128
AES_KEY_SIZE = 256
AES_CIPHER_MODE = AES.MODE_CBC
@staticmethod
def get_salt() -> bytes:
return bytes.fromhex("BFEB1E56FBCD973BB219022430A57843003D5644D21E62B9D4F180E7E6C33941")
def decrypt(self, key: bytes, ciphertext: bytes) -> str:
aes_key: bytes = PBKDF2(key, self.get_salt(), 32, 50000)
cipher = AES.new(aes_key, self.AES_CIPHER_MODE, ciphertext[32 : 32 + 16])
plaintext: str = cipher.decrypt(ciphertext[48:]).decode("ascii", "ignore").strip()
return plaintext
@staticmethod
def get_string(data: list[bytes], index: int) -> str:
return data[index][1:].decode("utf-8", "ignore")
def decrypt_config_item(self, key: bytes, data: list[bytes], index: int) -> str | bool:
_data: bytes = base64.b64decode(self.get_string(data, index))
plaintext = self.decrypt(key, _data)
if plaintext.lower() == "true":
return True
if plaintext.lower() == "false":
return False
return plaintext
@staticmethod
def get_wide_string(data: list[bytes], index: int) -> str:
_data: bytes = data[index][1:] + b"\x00"
return _data.decode("utf-16")
def decrypt_config_item_list(self, key: bytes, data: list[bytes], index: int) -> list[str]:
result: str = "".join(
filter(
lambda x: x in string.printable,
self.decrypt(key, base64.b64decode(data[index][1:])),
)
)
if result == "null":
return []
return result.split(",")
def decrypt_config_item_printable(self, key: bytes, data: list[bytes], index: int) -> str:
result = "".join(
filter(
lambda x: x in string.printable,
self.decrypt(key, base64.b64decode(data[index][1:])),
)
)
return result
@Extractor.rule
def asyncrat(self, p: ProcessMemory, match: YaraRuleMatch) -> Config | bool:
_info: Config = get_rule_metadata(match)
return _info
@Extractor.extractor("magic_cslr_0")
def asyncrat_magic(self, p: ProcessMemory, addr: int) -> Config | None:
try:
strings_offset = cast(UInt32, p.uint32v(addr + 0x40))
strings_size = cast(UInt32, p.uint32v(addr + 0x44))
raw: bytes = p.readv(addr + strings_offset, strings_size)
data = raw.split(b"\x00\x00")
key = base64.b64decode(self.get_string(data, 7))
log.debug("extracted key: %s", str(key))
config = {
self.family: {
"hosts": self.decrypt_config_item_list(key, data, 2),
"ports": self.decrypt_config_item_list(key, data, 1),
"version": self.decrypt_config_item_printable(key, data, 3),
"install_folder": self.get_wide_string(data, 5),
"install_file": self.get_wide_string(data, 6),
"install": self.decrypt_config_item_printable(key, data, 4),
"mutex": self.decrypt_config_item_printable(key, data, 8),
"pastebin": self.decrypt(key, base64.b64decode(data[12][1:])).encode("ascii").replace(b"\x0f", b""),
},
}
if config[self.family].get("pastebin", None) and config[self.family]["pastebin"] != "null":
try:
req = requests.get(url=config[self.family]["pastebin"])
if req.status_code == 200:
data = req.content.split(b"\x3a")
config[self.family]["host"] = data[0].decode("ascii", "ignore")
config[self.family]["ports"] = [data[1].decode("ascii", "ignore")]
except requests.exceptions.RequestException as error:
log.warning(error)
return config
except requests.exceptions.RequestException as error:
log.warning(error)
return None