fbnet/command_runner/base_service.py (112 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import abc import asyncio import logging from enum import Enum from fbnet.command_runner.exceptions import NotImplementedErrorException class State(Enum): CREATE = 0 INIT = 1 RUN = 2 CANCELED = 3 STOP = 4 class ServiceObjMeta(abc.ABCMeta): """ A meta class for handing commmon initializations """ _ALL_OBJTYPES = [] def __new__(cls, name, bases, attrs): objtype = super().__new__(cls, name, bases, attrs) ServiceObjMeta._ALL_OBJTYPES.append(objtype) return objtype @staticmethod def register_all_counters(stats_mgr): for objtype in ServiceObjMeta._ALL_OBJTYPES: objtype.register_counters(stats_mgr) class ServiceObj(metaclass=ServiceObjMeta): """ Common base-class for all application objects. * takes care of common initilization to provide a consistent view across objects """ def __init__(self, service, name=None): self._service = service self._loop = service.loop if service else asyncio.get_event_loop() self._objname = name or self.__class__.__name__ self._logger = self.create_logger() @property def loop(self): return self._loop @property def objname(self): return self._objname @property def service(self): return self._service @property def logger(self): return self._logger def create_logger(self): return logging.getLogger("fcr." + self.objname) def inc_counter(self, counter): if self.service and self.service.stats_mgr: self.service.stats_mgr.incrementCounter(counter) @classmethod def register_counters(cls, stats_mgr): pass class ServiceTask(ServiceObj): """ Base class for defining a Service Task in FCR. This takes care of common functionality for a service * make sure exception a properly handled * Each service only needs to implement to 'run()' method to add the business logic * Each service can optionally implement the 'cleanup()' method to free up the resources * Logging as service transitions through various stages. """ # Store reference to tasks that are currently running. This is mainly used # in unit-tests and for debugging _ALL_TASKS = {} def __init__(self, service, name=None, executor=None): super().__init__(service, name) self._state = State.CREATE # A Task may want to run blocking calls in separate thread. To run a # method in separate thread, task can use the _run_in_executor() method. # User can create their own executor instead using the default one # created by the asyncio. This allows user control over the type of # executor (task/threads) and its properties (e.g. num_workers) self._executor = executor # _update_event can be used to notify coroutines about the change in # state in this service. e.g. run() has completed self._update_event = asyncio.Condition(loop=self.loop) self.set_state(State.INIT) coro = self.start() # fixup task name to show actual task in logs coro.__qualname__ = self._objname self._task = asyncio.ensure_future(coro, loop=self.loop) self._ALL_TASKS[self._objname] = self @classmethod def all_tasks(cls): return cls._ALL_TASKS.items() def __await__(self): yield from self.wait().__await__() async def _run_in_executor(self, method, *args): return await self.loop.run_in_executor(self._executor, method, *args) async def wait(self): await self._update_event.acquire() await self._update_event.wait() self._update_event.release() def cancel(self): self._task.cancel() def set_state(self, state): self.logger.debug("%s: %s -> %s", self._objname, self._state, state) self._state = state @abc.abstractmethod async def run(self): """ Services must provide this implementation """ raise NotImplementedErrorException("run") async def cleanup(self): """ Services can override this to free resources """ pass async def start(self): self.set_state(State.RUN) try: await self._run() except asyncio.CancelledError: self.set_state(State.CANCELED) except Exception as e: self.logger.error("Exception: %s", e, exc_info=True) raise e finally: await self.cleanup() if self._executor is not None: self._executor.shutdown() self.set_state(State.STOP) del self._ALL_TASKS[self._objname] async def _run(self): await self.run() await self._notify() async def _notify(self): """ Notify coroutines waiting on this service """ await self._update_event.acquire() self._update_event.notify_all() self._update_event.release() class PeriodicServiceTask(ServiceTask): """ A periodic version of a ServiceTask It will call the run method, at specified intervals """ PERIOD = 5 * 60 def __init__(self, service, name=None, period=None, executor=None): super().__init__(service, name, executor=executor) self._period = period or self.PERIOD async def _run(self): while True: await self.run() await self._notify() await asyncio.sleep(self._period, loop=self.loop)