cortado/rtas/schtask_escalation.py (40 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License # 2.0; you may not use this file except in compliance with the Elastic License # 2.0. # Name: Scheduled Task Privilege Escalation # RTA: schtask_escalation.py # signal.rule.name: Local Scheduled Task Commands # signal.rule.name: Whoami Process Activity # signal.rule.name: Svchost spawning Cmd # signal.rule.name: Net command via SYSTEM account # ATT&CK: T1053 import logging import time from pathlib import Path from typing import Any from . import OSType, RuleMetadata, _common, register_code_rta log = logging.getLogger(__name__) def execute_schtasks(args: list[str], kwargs: dict[str, Any] = {}) -> int: retcode, _, _ = _common.execute_command(["schtasks.exe"] + args, **kwargs) return retcode @register_code_rta( id="1a61241e-5b1b-44ec-8c9f-3ae4652550be", name="schtask_escalation", platforms=[OSType.WINDOWS], endpoint_rules=[], siem_rules=[ RuleMetadata(id="afcce5ad-65de-4ed2-8516-5e093d3ac99a", name="Local Scheduled Task Creation"), RuleMetadata(id="ef862985-3f13-4262-a686-5f357bbb9bc2", name="Whoami Process Activity"), RuleMetadata(id="fd7a6052-58fa-4397-93c3-4795249ccfa2", name="Svchost spawning Cmd"), ], techniques=["T1033", "T1053", "T1059"], ) def main(): log.info("Scheduled Task Privilege Escalation") task_name = "test-task-rta" file_path = Path("task.log").resolve() command = f"cmd.exe /c whoami.exe > {file_path}" # Delete the task if it exists retcode = execute_schtasks(["/query", "/tn", task_name]) if retcode == 0: _ = execute_schtasks(["/delete", "/tn", task_name, "/f"]) retcode = execute_schtasks(["/create", "/tn", task_name, "/ru", "system", "/tr", command, "/sc", "onlogon"]) if retcode != 0: log.info("Error creating task") return # Run the task and grab the file retcode = execute_schtasks(["/run", "/tn", task_name]) if retcode == 0: time.sleep(1) _common.print_file(file_path) time.sleep(1) _common.remove_file(file_path) _ = execute_schtasks(["/delete", "/tn", task_name, "/f"])