jobs/fxci-taskcluster-export/fxci_etl/pulse/consume.py (50 lines of code) (raw):
from kombu import Connection, Exchange, Queue
from loguru import logger
from fxci_etl.config import Config
from fxci_etl.pulse.handler import BigQueryHandler, PulseHandler
def get_connection(config: Config):
assert config.pulse
pulse = config.pulse
return Connection(
hostname=pulse.host,
port=pulse.port,
userid=pulse.user,
password=pulse.password,
ssl=True,
)
def get_consumer(
config: Config, connection: Connection, name: str, callbacks: list[PulseHandler]
):
assert config.pulse
pulse = config.pulse
qconf = pulse.queues[name]
exchange = Exchange(qconf.exchange, type="topic")
exchange(connection).declare(
passive=True
) # raise an error if exchange doesn't exist
queue = Queue(
name=f"queue/{pulse.user}/{name}",
exchange=exchange,
routing_key=qconf.routing_key,
durable=pulse.durable,
exclusive=False,
auto_delete=not pulse.durable,
)
consumer = connection.Consumer(queue, auto_declare=False, callbacks=callbacks)
qinfo = consumer.queues[0].queue_declare()
logger.debug(f"{qinfo.message_count} pending messages")
consumer.queues[0].queue_bind()
return consumer
def drain(config: Config, name: str, callbacks: list[PulseHandler]):
logger.info(f"Draining pulse queue {name}")
with get_connection(config) as connection:
with get_consumer(config, connection, name, callbacks) as consumer:
while True:
try:
connection.drain_events(timeout=1)
except TimeoutError:
count = consumer.queues[0].queue_declare(passive=True).message_count
if count < 100:
break
for callback in callbacks:
callback.process_buffer()