ingestion-edge/ingestion_edge/flush.py (102 lines of code) (raw):
"""Fallback logic for retrying queued submit requests."""
from dataclasses import dataclass
from functools import partial
from typing import Dict, Optional, Tuple
from google.cloud.pubsub_v1 import PublisherClient
from persistqueue import SQLiteAckQueue
from persistqueue.exceptions import Empty
from sanic import Sanic
import asyncio
import uvloop
from .publish import get_client, get_queue
from .config import get_config_dict, logger
@dataclass
class Flush:
"""Class to store state for background flush task."""
client: PublisherClient
q: SQLiteAckQueue
concurrent_bytes: int
concurrent_messages: int
sleep_seconds: float
running: bool = False
task: Optional[asyncio.Task] = None
sleep_task: Optional[asyncio.Task] = None
def set_status(
self, message: Tuple[str, bytes, Dict[str, str]], future: asyncio.Future
):
"""Set message status from future."""
try:
# detect exception
future.result()
except: # noqa: E722
# message was not delivered
self.q.nack(message)
# do not raise in callback
else:
# message delivered
self.q.ack(message)
async def _flush(self):
"""Read messages from self.q and pass them to client.publish for batching.
Wait for all pending messages when an exception is thrown and every time
self.concurrent_messages or self.concurrent_bytes is reached or exceeded.
Clear acked data each time pending messages are awaited.
"""
pending, total_bytes = [], 0
try:
# send one batch to client
for _ in range(self.concurrent_messages):
if total_bytes >= self.concurrent_bytes:
# batch complete
break
try:
# get next message
message = self.q.get(block=False)
except Empty:
# batch complete
break
try:
# extract values
topic, data, attrs = message
# record size of message
total_bytes += len(data)
# publish message
future = self.client.publish(topic, data, **attrs)
# ack or nack by callback
future.add_done_callback(partial(self.set_status, message))
# wait for this later
pending.append(future)
except Exception:
# don't leave message unacked in q
self.q.nack(message)
raise
# wait for pending operations and raise on first exception
await asyncio.gather(*pending)
except: # noqa: E722
# wait for pending operations but don't raise exceptions
await asyncio.gather(*pending, return_exceptions=True)
raise # from bare except
else:
# clean up disk space
self.q.clear_acked_data(max_delete=None, keep_latest=None)
# return number of messages flushed
return len(pending)
async def run_forever(self):
"""Periodically call flush.
Sleep for sleep_seconds every time the queue is empty or an exception
is thrown.
Must not exit unless self.running is False or an interrupt is received.
"""
self.running = True
while self.running:
try:
# flush until the queue is empty or a publish fails
while self.running:
if not await self._flush():
break
# wait between flushes
self.sleep_task = asyncio.create_task(asyncio.sleep(self.sleep_seconds))
await self.sleep_task
except Exception:
pass # ignore exceptions
async def before_server_start(self, _, loop: uvloop.Loop):
"""Execute self.run_forever() in the background."""
self.task = loop.create_task(self.run_forever())
async def after_server_stop(self, *_):
"""Call flush one last time after server stop."""
# prevent further flushing
self.running = False
# cancel running flush
if self.sleep_task is not None:
self.sleep_task.cancel()
# wait for current flush to finish
await self.task
# flush until empty
while await self._flush():
pass
async def run_until_complete(self):
"""Call flush until queue is empty."""
while not self.q.empty():
try:
await self._flush()
except Exception:
logger.exception("pubsub unavailable")
def get_flush(config: dict, client: PublisherClient, q: SQLiteAckQueue) -> Flush:
"""Create a Flush instance."""
return Flush(
client,
q,
**{
key[6:].lower(): value
for key, value in config.items()
if key.startswith("FLUSH_")
}
)
def init_app(app: Sanic, client: PublisherClient, q: SQLiteAckQueue):
"""Initialize Sanic app with url rules."""
flush = get_flush(app.config, client, q)
# schedule periodic flush in background on app start
app.listener("before_server_start")(flush.before_server_start) # type: ignore
# schedule flush on shutdown
app.listener("after_server_stop")(flush.after_server_stop)
def main():
"""Flush until queue is empty."""
config = get_config_dict()
flush = get_flush(config, get_client(config), get_queue(config))
asyncio.new_event_loop().run_until_complete(flush.run_until_complete())
if __name__ == "__main__":
main()