nubia/internal/io/eventbus.py (18 lines of code) (raw):

#!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. # import logging import traceback from nubia.internal.helpers import try_await logger = logging.getLogger(__name__) class Message: CONNECTED = 1 class Listener: async def react(self, msg, *args, **kwargs): if msg == Message.CONNECTED: try: await try_await(self.on_connected(*args, **kwargs)) except NotImplementedError: raise except Exception as e: logger.info("Couldn't initialize {}: " "{}".format(type(self), e)) traceback.print_exc() async def on_connected(*args, **kwargs): raise NotImplementedError("Listeners must implement on_connected method")