cortado/rtas/lateral_commands.py (66 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
# Name: Lateral Movement Commands
# RTA: lateral_commands.py
# Elatic Detection: Local Service Commands
# signal.rule.name: Local Scheduled Task Commands
# signal.rule.name: Whoami Process Activity
# ATT&CK: T1021, T1047, T1077, T1124, T1126
# Description: Runs various Windows commands typically used by attackers to move laterally from the local machine.
import logging
import os
import re
from . import OSType, RuleMetadata, _common, _const, register_code_rta
log = logging.getLogger(__name__)
MY_APP_EXE = "bin/myapp.exe"
@register_code_rta(
id="389392dc-61db-4e45-846f-099f7d289c1b",
name="lateral_commands",
platforms=[OSType.WINDOWS],
endpoint_rules=[],
siem_rules=[RuleMetadata(id="d61cbcf8-1bc1-4cff-85ba-e7b21c5beedc", name="Service Command Lateral Movement")],
techniques=["T1569", "T1021", "T1543"],
ancillary_files=[MY_APP_EXE],
)
def main():
remote_host = _common.get_host_ip()
log.info("Attempting to laterally move to %s" % remote_host)
remote_host = _common.resolve_hostname(remote_host)
log.info("Using ip address %s" % remote_host)
# Put the hostname in quotes for WMIC, but leave it as is
if not re.match(_const.IP_REGEX, remote_host):
wmi_node = f'"{remote_host}"'
else:
wmi_node = remote_host
commands = [
"sc.exe \\\\{host} create test_service binPath= %s" % MY_APP_EXE,
"sc.exe \\\\{host} config test_service binPath= c:\\windows\\system32\\ipconfig.exe",
"sc.exe \\\\{host} failure test_service command= c:\\windows\\system32\\net.exe",
"sc.exe \\\\{host} start test_service",
"sc.exe \\\\{host} delete test_service",
"wmic.exe /node:{wmi_node} process call create ipconfig.exe",
"wmic.exe /node:{wmi_node} path WIN32_USERACCOUNT where(name='vagrant') set passwordexpires='false'",
"net.exe time \\\\{host}",
"net.exe use \\\\{host}\\admin$",
"net.exe use \\\\{host}\\admin$ /delete",
"net.exe use \\\\{host}\\c$",
"net.exe use \\\\{host}\\c$ /delete",
]
for command in commands:
formatted_command = command.format(host=remote_host, wmi_node=wmi_node)
_ = _common.execute_command(formatted_command, shell=True)
_, whoami, _ = _common.execute_command("whoami", shell=True)
_, hostname, _ = _common.execute_command("hostname", shell=True)
if not whoami or not hostname:
raise _common.ExecutionError("Can't get `whoami` or `hostname` command results")
whoami = whoami.lower()
separator = "\\\\"
domain, _, _ = whoami.partition(separator) # type: ignore
hostname = hostname.lower()
schtasks_host = remote_host
# Check if the account is local or a domain
if domain in (hostname, "NT AUTHORITY"):
log.info(
"Need password for remote scheduled task in workgroup. Performing instead on %s." % _common.get_host_ip()
)
schtasks_host = _common.get_host_ip()
task_name = "test_task-%d" % os.getpid()
schtask_commands = [
r"schtasks /s {host} /delete /tn {name} /f",
r"schtasks /s {host} /create /SC MONTHLY /MO first /D SUN /tn {name} /tr c:\windows\system32\ipconfig.exe /f",
r"schtasks /s {host} /run /tn {name}",
r"schtasks /s {host} /delete /tn {name} /f",
]
for command in schtask_commands:
command = command.format(host=schtasks_host, name=task_name)
_ = _common.execute_command(command, shell=True)
# Remote powershell
_ = _common.execute_command(["C:\\Windows\\system32\\wsmprovhost.exe", "-Embedding"], timeout_secs=5)