in services/ui_backend_service/api/ws.py [0:0]
def __init__(self, app, db, event_emitter=None, queue_ttl: int = WS_QUEUE_TTL_SECONDS, cache=None):
self.event_emitter = event_emitter or AsyncIOEventEmitter()
self.db = db
self.queue = TTLQueue(queue_ttl)
self.task_refiner = TaskRefiner(cache=cache.artifact_cache) if cache else None
self.artifact_refiner = ArtifactRefiner(cache=cache.artifact_cache) if cache else None
self.logger = logging.getLogger("Websocket")
event_emitter.on('notify', self.event_handler)
app.router.add_route('GET', '/ws', self.websocket_handler)
self.loop = asyncio.get_event_loop()