cortado/rtas/_common.py (429 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import binascii
import contextlib
import errno
import getpass
import logging
import os
import platform
import re
import shutil
import socket
import subprocess
import sys
import threading
import time
import typing
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
from typing import Any
from cortado.rtas import OSType
from cortado.rtas._const import (
ACCESS_DENIED_RETURNCODE,
PS_EXEC_EXE,
REG_HKCR,
REG_HKCU,
REG_HKLM,
REG_HKU,
RTA_SUBPROCESS_TIMEOUT_RETURNCODE,
)
log = logging.getLogger(__name__)
# Amount of seconds a command should take at a minimum.
# This can allow for arbitrary slow down of scripts
MIN_EXECUTION_TIME = 0
DEFAULT_SUBPROCESS_TIMEOUT_SECS = 40
HOSTS_TO_PROCESS_CAP = 64
class ExecutionError(Exception):
pass
## OS details
def get_current_user() -> str:
return getpass.getuser().lower()
def get_current_dir():
return Path(__file__).resolve().parent
def get_hostname():
return socket.gethostname()
def get_host_ip() -> str:
try:
return socket.gethostbyname(get_hostname())
except socket.gaierror:
pass
return "127.0.0.1"
def resolve_hostname(hostname: str):
return socket.gethostbyname(hostname)
def is_system() -> bool:
user_name = get_current_user()
return user_name == "system" or user_name.endswith("$")
def get_current_os() -> OSType:
if sys.platform == "darwin":
return OSType.MACOS
elif sys.platform.startswith("win"):
return OSType.WINDOWS
else:
return OSType.LINUX
def is_64bit():
arch_env_var = "PROCESSOR_ARCHITECTURE"
return os.environ.get(arch_env_var, "") in ("x64", "AMD64")
def get_cmd_path() -> str:
"""Get OS-specific path for a command executable"""
current_os = get_current_os()
if current_os == OSType.WINDOWS:
cmd_path = os.environ.get("COMSPEC")
if not cmd_path:
raise ValueError("Can't get COMSPEC env var value")
return cmd_path
return "/bin/sh"
def get_current_exec_args():
script_path = os.path.abspath(sys.argv[0])
return [sys.executable, script_path] + sys.argv[1:]
def get_winreg():
import winreg
return winreg
## Resource utilities
def get_resource_path(path: str | Path) -> Path:
"""Resolve relative path to a resource file into an absolute OS-specific path"""
current_dir = Path(__file__).resolve().parent
return current_dir / path
## File utilities
def create_file_with_data(path: Path | str, data: str | bytes) -> None:
data_bytes = data.encode("utf-8") if isinstance(data, str) else data
_ = Path(path).write_bytes(data_bytes)
@contextlib.contextmanager
def file_with_data(path: str | Path, data: str | bytes):
data_bytes = data.encode("utf-8") if isinstance(data, str) else data
with open(path, "wb+") as f:
_ = f.write(data_bytes)
_ = f.seek(0)
yield f
def copy_file(source: str | Path, target: str | Path):
log.info(f"Copying `{source}` to `{target}`")
shutil.copy(source, target)
def patch_file_with_bytes(
source_file: Path | str, old_bytes: bytes, new_bytes: bytes, target_file: Path | str | None = None
):
target_file = target_file or source_file
log.info(
f"Patching `{source_file}`, replacing `{binascii.b2a_hex(old_bytes)}` bytes with "
f"`{binascii.b2a_hex(new_bytes)}` bytes, and saving as `{target_file}`"
)
source = Path(source_file)
data = source.read_bytes()
patched_data = data.replace(old_bytes, new_bytes)
_ = Path(target_file).write_bytes(patched_data)
def patch_file_with_regex(
source_file: Path | str, regex: bytes | str, new_data: bytes | str, target_file: Path | str | None = None
):
target_file = target_file or source_file
data = new_data if isinstance(new_data, bytes) else new_data.encode("utf-8")
log.info(
f"Patching `{source_file}`, replaving matches to `{regex}` regex with "
f"`{binascii.b2a_hex(data)}` bytes, and saving as `{target_file}`"
)
regex_bytes = regex if isinstance(regex, bytes) else regex.encode("utf-8")
contents = Path(source_file).read_bytes()
matches = re.findall(regex_bytes, contents)
if not matches:
log.warning("No regex matches found")
return
contents = re.sub(regex_bytes, data, contents)
_ = Path(target_file).write_bytes(contents)
# FIXME: might be not needed?
def link_file(source: str | Path, target: str | Path):
log.info(f"Symlinking `{source}` to `{target}`")
Path(source).symlink_to(Path(target))
def remove_file(path: str | Path):
p = Path(path)
if p.is_file():
log.info(f"Removing file `{p}`")
p.unlink()
def remove_files(paths: list[str | Path]):
for path in paths:
remove_file(path)
def remove_directory(path: str | Path):
p = Path(path)
if p.is_dir():
log.info(f"Removing directory `{path}`")
p.rmdir()
else:
remove_file(p)
## Process exec utilities
def execute_command(
command_args: str | list[Any],
timeout_secs: float | int = DEFAULT_SUBPROCESS_TIMEOUT_SECS,
capture_output: bool = False,
ignore_failures: bool = False,
ignore_timeout: bool = True,
shell: bool = False,
stdin_data: str | bytes | None = None,
env_vars: dict[str, str] | None = None,
) -> tuple[int, bytes | None, bytes | None]:
# NOTE: `list2cmdline` is an internal function, so it might break in the future
# https://github.com/python/cpython/blob/4bb1dd3c5c14338c9d9cea5988431c858b3b76e0/Lib/subprocess.py#L66
if isinstance(command_args, list):
command_args = [str(a) for a in command_args]
command_str = subprocess.list2cmdline(command_args)
else:
command_str = command_args
user_name = get_current_user()
hostname = get_hostname()
log.info(f"Executing command as `{user_name}` at `{hostname}`: `{command_str}`")
start = time.time()
try:
result = subprocess.run(
command_args,
input=stdin_data,
stdout=subprocess.PIPE if capture_output else subprocess.DEVNULL,
stderr=subprocess.PIPE if capture_output else subprocess.DEVNULL,
capture_output=capture_output,
timeout=timeout_secs,
shell=shell,
env=env_vars,
check=True,
start_new_session=True,
)
except subprocess.CalledProcessError as e:
log.error(f"Error while executing command in a subprocess: {e}")
if ignore_failures:
return e.returncode, e.stdout, e.stderr
raise ExecutionError("Subprocess command execution failed", e)
except subprocess.TimeoutExpired:
log.error(f"Subprocess command timed out. timeout_secs={timeout_secs}")
if ignore_timeout:
return RTA_SUBPROCESS_TIMEOUT_RETURNCODE, None, None
raise ExecutionError("Subprocess command timed out")
run_time = time.time() - start
log.info(f"Command executed successfully. Return code = {result.returncode}, exec time = {run_time} secs")
return result.returncode, result.stdout, result.stderr
def create_macos_masquerade(masquerade: str):
if platform.processor() == "arm":
name = "com.apple.ditto_and_spawn_arm"
else:
name = "com.apple.ditto_and_spawn_intel"
source = get_resource_path(f"bin/{name}")
copy_file(source, masquerade)
def clear_web_cache(sleep_secs: int = 1):
log.info("Clearing temporary web cache files")
_ = execute_command(["RunDll32.exe", "InetCpl.cpl,", "ClearMyTracksByProcess", "8"])
time.sleep(sleep_secs)
## HTTP server
def serve_dir_over_http(
ip: str | None = None, port: int | None = None, dir_path: Path | None = None
) -> tuple[HTTPServer, str, int]:
handler = SimpleHTTPRequestHandler
dir_path = dir_path or get_current_dir()
ip = ip or get_host_ip()
server = None
if port:
server = HTTPServer((ip, port), handler)
else:
# Otherwise, try to find a port that's available
for port in range(8000, 9000):
try:
server = HTTPServer((ip, port), handler)
break
except socket.error:
pass
if not port or not server:
raise ExecutionError("Can't start a web server")
def server_thread():
os.chdir(dir_path)
log.info(f"Starting web server on http://{ip}:{port:d} for directory {dir_path}")
server.serve_forever()
# Start this thread in the background
thread = threading.Thread(target=server_thread, daemon=True)
thread.start()
return server, ip, port
## Unilities
def as_wchar(s: str) -> bytes:
return s.encode("utf-16le")
def find_writeable_directory(base_dir: str | Path):
base_dir_path = Path(base_dir)
for dirpath, dirnames, _ in base_dir_path.walk():
for subdir_name in dirnames:
subdir_path = dirpath / subdir_name
test_file = subdir_path / "test_file"
try:
_ = test_file.write_bytes(b"test")
return subdir_path
except PermissionError:
pass
finally:
test_file.unlink()
def elevate_to_system(arguments: list[str] | None = None) -> bool:
if is_system():
return True
arguments = arguments or get_current_exec_args()
ps_exec_path = get_resource_path(PS_EXEC_EXE)
log.info("Attempting to elevate to SYSTEM using PsExec")
if not ps_exec_path.is_file():
log.error("PsExec not found")
raise ExecutionError(f"PsExec not found at `{ps_exec_path}`")
returncode, _, _ = execute_command([str(ps_exec_path), "-w", os.getcwd(), "-accepteula", "-s"] + arguments)
if returncode == ACCESS_DENIED_RETURNCODE:
log.error("Failed to elevate to SYSTEM")
return False
return True
## Registry utilities
def write_to_registry(
hive: str,
key: str,
value: str,
data: str | int | list[str | int],
data_type: str | int = "sz",
restore: bool = True,
pause: bool = False,
append: bool = False,
) -> None:
with temp_registry_value(hive, key, value, data, data_type, restore, pause, append):
pass
@contextlib.contextmanager
def temp_registry_value(
hive_name: str,
key: str,
value: str,
data: str | int | list[str | int],
data_type: str | int = "sz",
restore: bool = True,
pause: bool = False,
append: bool = False,
):
winreg = get_winreg()
pre_restore_sleep_secs = 0.5
post_changes_pause_sleep_secs = 0.5
hives: dict[str, Any] = {
REG_HKLM: winreg.HKEY_LOCAL_MACHINE, # type: ignore
REG_HKCU: winreg.HKEY_CURRENT_USER, # type: ignore
REG_HKU: winreg.HKEY_USERS, # type: ignore
REG_HKCR: winreg.HKEY_CLASSES_ROOT, # type: ignore
}
hive = hives[hive_name]
if isinstance(data_type, str):
attr = "REG_" + data_type.upper()
data_type = getattr(winreg, attr)
else:
data_type = data_type or winreg.REG_SZ # type: ignore
key = key.rstrip("\\")
hkey = winreg.CreateKey(hive, key) # type: ignore
old_data = None
old_type = None
try:
# check if the key already exists
old_data, old_type = winreg.QueryValueEx(hkey, value) # type: ignore
except OSError as e:
# Check if the error is "No such file or directory"
if e.errno != errno.ENOENT:
raise
key_exists = old_type is not None
if append and key_exists:
# If appending to the existing REG_MULTI_SZ key, then append to the end
if not isinstance(data, list):
data = [data]
if isinstance(old_data, list):
data = old_data + data
data_string = ",".join(data) if isinstance(data, list) else data # type: ignore
log.info(f"Writing to registry: key=`{key}`, value=`{value}`, data=`{data_string}`")
winreg.SetValueEx(hkey, value, 0, data_type, data) # type: ignore
stored_data, _ = winreg.QueryValueEx(hkey, value) # type: ignore
if data != stored_data:
log.warning(f"Wrote `{data}` to registry at `{hkey}` but retrieved `{stored_data}`")
# Allow code to execute within the context manager 'with'
try:
yield
finally:
if restore:
time.sleep(pre_restore_sleep_secs)
if key_exists:
# Otherwise restore the value
old_data_string = ",".join(old_data) if isinstance(old_data, list) else old_data # type: ignore
log.info(f"Restoring registry value to: key=`{key}`, value=`{value}`, data=`{old_data_string}`")
winreg.SetValueEx(hkey, value, 0, old_type, old_data) # type: ignore
else:
# If it didn't already exist, then delete it
log.info(f"Deleting from registry: key=`{key}`, value=`{value}`")
winreg.DeleteValue(hkey, value) # type: ignore
hkey.Close() # type: ignore
if pause:
time.sleep(post_changes_pause_sleep_secs)
def enable_logon_audit(host: str = "localhost", verbose: bool = True, sleep_secs: int = 2) -> bool:
"""Enable logon auditing on local or remote system to enable 4624 and 4625 events."""
if verbose:
log.info(f"Ensuring audit logging is enabled on {host}")
auditpol_cmd = "auditpol.exe /set /subcategory:Logon /failure:enable /success:enable"
enable_logging_cmd = (
f"Invoke-WmiMethod -ComputerName {host} -Class Win32_process -Name create -ArgumentList '{auditpol_cmd}'"
)
command = ["powershell", "-c", enable_logging_cmd]
retcode, _, stderr = execute_command(command)
# additional time to allow auditing to process
time.sleep(sleep_secs)
if retcode != 0:
error = stderr or ""
log.error(f"Error while enabling logon audit: `{error}`")
return False
return True
def print_file(path: str | Path):
file_path = Path(path)
if not file_path.is_file():
print(f"File `{path}` is not found")
return
print(f"Contents of `{path}`:")
data = file_path.read_bytes()
print(data.rstrip())
## Windows utils
@typing.no_type_check
def get_process_pid(pname: str) -> int | None:
import ctypes
TH32CS_SNAPPROCESS = 0x00000002
DWORD = ctypes.c_uint32
LONG = ctypes.c_int32
NULL_T = ctypes.c_void_p
TCHAR = ctypes.c_char
MAX_PATH = 260
class PROCESSENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", DWORD),
("cntUsage", DWORD),
("th32ProcessID", DWORD),
("th32DefaultHeapID", NULL_T),
("th32ModuleID", DWORD),
("cntThreads", DWORD),
("th32ParentProcessID", DWORD),
("pcPriClassBase", LONG),
("dwFlags", DWORD),
("szExeFile", TCHAR * MAX_PATH),
]
CreateToolhelp32Snapshot = ctypes.windll.kernel32.CreateToolhelp32Snapshot
Process32First = ctypes.windll.kernel32.Process32First
Process32Next = ctypes.windll.kernel32.Process32Next
CloseHandle = ctypes.windll.kernel32.CloseHandle
hProcessSnap = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)
pe32 = PROCESSENTRY32()
pe32.dwSize = ctypes.sizeof(PROCESSENTRY32)
if Process32First(hProcessSnap, ctypes.byref(pe32)) == 0:
log.info("Failed getting first process")
return
while True:
procname = pe32.szExeFile.decode("utf-8").lower()
if pname.lower() in procname:
CloseHandle(hProcessSnap)
return pe32.th32ProcessID
if not Process32Next(hProcessSnap, ctypes.byref(pe32)):
CloseHandle(hProcessSnap)
return None
@typing.no_type_check
def inject_shellcode(path: Path, shellcode: bytes):
import ctypes
import ctypes.wintypes
from ctypes import windll
from ctypes.wintypes import BOOL, DWORD, HANDLE, LPCVOID, LPVOID
import win32process
# created suspended process
info = win32process.CreateProcess(None, path, None, None, False, 0x04, None, None, win32process.STARTUPINFO())
page_rwx_value = 0x40
memcommit = 0x00001000
class _SECURITY_ATTRIBUTES(ctypes.Structure):
_fields_ = [
("nLength", DWORD),
("lpSecurityDescriptor", LPVOID),
("bInheritHandle", BOOL),
]
LPSECURITY_ATTRIBUTES = ctypes.POINTER(_SECURITY_ATTRIBUTES)
LPTHREAD_START_ROUTINE = LPVOID
if info[0].handle > 0:
log.info(f"Created {path} Suspended")
shellcode_length = len(shellcode)
process_handle = info[0].handle # phandle
VirtualAllocEx = windll.kernel32.VirtualAllocEx
VirtualAllocEx.restype = LPVOID
VirtualAllocEx.argtypes = (HANDLE, LPVOID, DWORD, DWORD, DWORD)
WriteProcessMemory = ctypes.windll.kernel32.WriteProcessMemory
WriteProcessMemory.restype = BOOL
WriteProcessMemory.argtypes = (HANDLE, LPVOID, LPCVOID, DWORD, DWORD)
CreateRemoteThread = ctypes.windll.kernel32.CreateRemoteThread
CreateRemoteThread.restype = HANDLE
CreateRemoteThread.argtypes = (HANDLE, LPSECURITY_ATTRIBUTES, DWORD, LPTHREAD_START_ROUTINE, LPVOID, DWORD, DWORD)
# allocate RWX memory
lpBuffer = VirtualAllocEx(process_handle, 0, shellcode_length, memcommit, page_rwx_value)
log.info(f"Allocated remote memory at {hex(lpBuffer)}")
# write shellcode in allocated memory
res = WriteProcessMemory(process_handle, lpBuffer, shellcode, shellcode_length, 0)
if res > 0:
log.info("Shellcode written")
# create remote thread to start shellcode execution
CreateRemoteThread(process_handle, None, 0, lpBuffer, 0, 0, 0)
log.info("Shellcode Injection, done")
def configure_logging(logging_level: int = logging.DEBUG, root_logger_name: str = "cortado.rtas"):
"""Configure logging level and log output format for RTAs root logger.
By default, logging level is set to DEBUG. The logs are printed to stderr as plain text.
"""
logging.basicConfig(
format="%(asctime)s %(name)s %(levelname)-6s %(message)s",
level=logging_level,
datefmt="%Y-%m-%d %H:%M:%S",
)