elasticapm/utils/threading.py (54 lines of code) (raw):

# BSD 3-Clause License # # Copyright (c) 2019, Elasticsearch BV # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE from __future__ import absolute_import import logging import os import threading from timeit import default_timer logger = logging.getLogger("elasticapm.utils.threading") class IntervalTimer(threading.Thread): """ A timer that runs a function repeatedly. In contrast to threading.Timer, IntervalTimer runs the given function in perpetuity or until it is cancelled. When run, it will wait `interval` seconds until the first execution. """ def __init__( self, function, interval, name=None, args=(), kwargs=None, daemon=None, evaluate_function_interval=False ) -> None: """ :param function: the function to run :param interval: the interval in-between invocations of the function, in milliseconds :param name: name of the thread :param args: arguments to call the function with :param kwargs: keyword arguments to call the function with :param daemon: should the thread run as a daemon :param evaluate_function_interval: if set to True, and the function returns a number, it will be used as the next interval """ super(IntervalTimer, self).__init__(name=name, daemon=daemon) self.daemon = daemon self._args = args self._kwargs = kwargs if kwargs is not None else {} self._function = function self._interval = interval self._interval_done = threading.Event() self._evaluate_function_interval = evaluate_function_interval def run(self) -> None: execution_time = 0 interval_override = None while True: real_interval = (interval_override if interval_override is not None else self._interval) - execution_time interval_completed = True if real_interval: interval_completed = self._interval_done.wait(real_interval) if interval_completed: # we've been cancelled, time to go home return start = default_timer() try: rval = self._function(*self._args, **self._kwargs) if self._evaluate_function_interval and isinstance(rval, (int, float)): interval_override = rval else: interval_override = None except Exception: logger.error("Exception in interval timer function", exc_info=True) execution_time = default_timer() - start def cancel(self) -> None: self._interval_done.set() class ThreadManager(object): def __init__(self) -> None: self.pid = None self.start_stop_order = 100 def start_thread(self, pid=None) -> None: if not pid: pid = os.getpid() self.pid = pid def stop_thread(self): raise NotImplementedError() def is_started(self, current_pid=None): if not current_pid: current_pid = os.getpid() return self.pid == current_pid