docker_images/system_control_app/drop.py (60 lines of code) (raw):

# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE file in the project root for # full license information. import logging import subprocess logger = logging.getLogger("system_control_app." + __name__) mqtt_port = 8883 mqttws_port = 443 uninitialized = "uninitialized" sudo_prefix = uninitialized all_disconnect_types = ["DROP", "REJECT"] all_transports = ["mqtt", "mqttws"] def get_sudo_prefix(): global sudo_prefix # use "uninitialized" to mean uninitialized, because None and [] are both falsy and we want to set it to [], so we can't use None if sudo_prefix == uninitialized: try: run_shell_command("which sudo") except subprocess.CalledProcessError: sudo_prefix = "" else: sudo_prefix = "sudo -n " return sudo_prefix def run_shell_command(cmd): logger.info("running [{}]".format(cmd)) try: return subprocess.check_output(cmd.split(" ")).decode("utf-8").splitlines() except subprocess.CalledProcessError as e: logger.error("Error spawning {}".format(e.cmd)) logger.error("Process returned {}".format(e.returncode)) logger.error("process output: {}".format(e.output)) raise def transport_to_port(transport): if transport == "mqtt": return mqtt_port elif transport == "mqttws": return mqttws_port else: raise ValueError( "transport_type {} invalid. Only mqtt and mqttws are accepted".format( transport ) ) def disconnect_port(disconnect_type, transport): # sudo -n iptables -A OUTPUT -p tcp --dport 8883 -j DROP port = transport_to_port(transport) run_shell_command( "{}iptables -A OUTPUT -p tcp --dport {} -j {}".format( get_sudo_prefix(), port, disconnect_type ) ) def reconnect_port(transport): port = transport_to_port(transport) for disconnect_type in all_disconnect_types: # sudo -n iptables -L OUTPUT -n -v --line-numbers lines = run_shell_command( "{}iptables -L OUTPUT -n -v --line-numbers".format(get_sudo_prefix()) ) # do the lines in reverse because deleting an entry changes the line numbers of all entries after that. lines.reverse() for line in lines: if disconnect_type in line and str(port) in line: line_number = line.split(" ")[0] logger.info("Removing {} from [{}]".format(line_number, line)) # sudo -n iptables -D OUTPUT 1 run_shell_command( "{}iptables -D OUTPUT {}".format(get_sudo_prefix(), line_number) )