azurelinuxagent/ga/periodic_operation.py (42 lines of code) (raw):

# Copyright 2020 Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Requires Python 2.6+ and Openssl 1.0+ # import datetime import time from azurelinuxagent.common import logger from azurelinuxagent.common.future import ustr class PeriodicOperation(object): ''' Instances of PeriodicOperation are tasks that are executed only after the given time period has elapsed. NOTE: the run() method catches any exceptions raised by the operation and logs them as warnings. ''' # To prevent flooding the log with error messages we report failures at most every hour _LOG_WARNING_PERIOD = datetime.timedelta(minutes=60) def __init__(self, period): self._name = self.__class__.__name__ self._period = period if isinstance(period, datetime.timedelta) else datetime.timedelta(seconds=period) self._next_run_time = datetime.datetime.utcnow() self._last_warning = None self._last_warning_time = None def run(self): try: if self._next_run_time <= datetime.datetime.utcnow(): try: logger.verbose("Executing {0}...", self._name) self._operation() finally: self._next_run_time = datetime.datetime.utcnow() + self._period except Exception as e: warning = "Error in {0}: {1} --- [NOTE: Will not log the same error for the next hour]".format(self._name, ustr(e)) if warning != self._last_warning or self._last_warning_time is None or datetime.datetime.utcnow() >= self._last_warning_time + self._LOG_WARNING_PERIOD: logger.warn(warning) self._last_warning_time = datetime.datetime.utcnow() self._last_warning = warning def next_run_time(self): return self._next_run_time def _operation(self): """ Derived classes must override this with the definition of the operation they need to perform """ raise NotImplementedError() @staticmethod def sleep_until_next_operation(operations): """ Takes a list of operations, finds the operation that should be executed next (that with the closest next_run_time) and sleeps until it is time to execute that operation. """ next_operation_time = min(op.next_run_time() for op in operations) sleep_timedelta = next_operation_time - datetime.datetime.utcnow() # timedelta.total_seconds() is not available on Python 2.6, do the computation manually sleep_seconds = ((sleep_timedelta.days * 24 * 3600 + sleep_timedelta.seconds) * 10.0 ** 6 + sleep_timedelta.microseconds) / 10.0 ** 6 if sleep_seconds > 0: time.sleep(sleep_seconds)