runnable-hub/python/runnable_hub/queue/redis.py (17 lines of code) (raw):

import os from ..interface import RunnableQueue, RunnableQueueBus import redis.asyncio as redis class RunnableRedisQueue(RunnableQueue): def __init__(self, name, client): self.name = name self.client = client async def send(self, content: str): await self.client.rpush(self.name, content) async def receive(self): data = await self.client.blpop(self.name, timeout=0) return data[1].decode('utf-8') class RunnableRedisQueueBus(RunnableQueueBus): def __init__(self, host: str, port: int, db:int): self.pool = redis.ConnectionPool(host=host, port=port, db=db) def register(self, name: str): return RunnableRedisQueue(name, redis.Redis(connection_pool=self.pool))