docker_images/pythonv2/wrapper/python_glue/internal_control_glue.py (41 lines of code) (raw):

# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE file in the project root for # full license information. import logging import leak_check import os import gc import platform logger = logging.getLogger(__name__) do_async = False tracker = leak_check.LeakTracker() tracker.add_tracked_module("azure.iot.device") tracker.set_baseline() def log_message_sync(msg): if isinstance(msg, dict) and "message" in msg: print(msg["message"]) else: print(str(msg)) def set_flags_sync(flags): global do_async logger.info("setting flags to {}".format(flags)) # Resist the tempation to use getattr. We don't want to change flags that aren't populated. if "test_async" in flags: do_async = flags["test_async"] def get_capabilities_sync(): caps = { "flags": { "v2_connect_group": True, "system_control_app": True, "checks_for_leaks": True, } } return caps def send_command_sync(cmd): if cmd == "check_for_leaks": # If you temporarily comment this out, uncomment the log line so you don't spend # many hours tracking down bug that should have been caught here when you don't # remember that you commented this out. # Not that this has ever happened to me. tracker.check_for_new_leaks() # logger.info("NOT CHECCKING FOR LEAKS") else: raise Exception("Unsupported Command") def get_wrapper_stats_sync(): return { "language": "python", "languageVersion": platform.python_version(), "wrapperGcObjectCount": len(gc.get_objects()), "wrapperPid": os.getpid(), }