ingestion-edge/ingestion_edge/publish.py (109 lines of code) (raw):

"""Main logic for handling submit requests.""" from datetime import datetime from sqlite3 import DatabaseError from sanic import Sanic, response from sanic.request import Request from functools import partial from google.api_core.retry import Retry, if_exception_type from google.cloud.pubsub_v1 import PublisherClient from persistqueue import SQLiteAckQueue from typing import Callable, Dict, Optional, Tuple from .config import logger from .util import AsyncioBatch, HTTP_STATUS import google.api_core.exceptions TRANSIENT_ERRORS = if_exception_type( # Service initiated retry google.api_core.exceptions.Aborted, # Service interrupted when handling the request google.api_core.exceptions.Cancelled, # Service throttled the request google.api_core.exceptions.TooManyRequests, # Service outage or connection issue google.api_core.exceptions.ServerError, ) def _legacy_encoding(value: Optional[str], request: Request) -> Optional[str]: """Convert string from UTF-8 with surrogate escape characters to ISO-8859-1. As of Sanic 21.3.0 in sanic.http.http1 headers are decoded with the default codec (UTF-8) and errors="surrogateescape", but behavior prior to that was to decode headers with ISO-8859-1, aka latin-1. Unit tests hit decode headers in sanic.http.http1, but prod and some integration tests decode headers in sanic.asgi with the latin-1 codec. It's not possible to reliably determine which codec was used from the string alone, but requests built in sanic.http.http1 set request.head, while those built in sanic.asgi don't set request.head, so it defaults to b"". Thus request.head being empty can be used as a proxy to determine whether legacy encoding needs to be restored. Surrogate escape characters cause exceptions to be thrown if they are encoded back to bytes without specifying an error handling strategy, such as when checking the byte length of individual headers, and when encoding the value as protobuf to be sent to PubSub. This changes the decoding back to ISO-8859-1 when request.head is present to preserve legacy behavior and ensure all character sequences are valid and safe to encode as UTF-8. """ if value is None or request.head == b"": # header not present, or already using desired codec return value # Sanic used the default codec to decode value, so don't explicitly specify a codec # for value.encode, even though it is expected to be UTF-8 return value.encode(errors="surrogateescape").decode("ISO-8859-1") async def submit( request: Request, client: PublisherClient, q: SQLiteAckQueue, topic: str, metadata_headers: Dict[str, str], **kwargs, ) -> response.HTTPResponse: """Deliver request to the pubsub topic. Deliver to the local queue to be retried on transient errors. """ data = request.body attrs = { key: value for key, value in dict( submission_timestamp=datetime.utcnow().isoformat() + "Z", uri=request.path, protocol="HTTP/" + request.version, method=request.method, args=request.query_string, remote_addr=request.ip, host=request.host, **{ attr: _legacy_encoding(request.headers.get(header), request) for header, attr in metadata_headers.items() }, ).items() if value is not None } # assert valid pubsub message for value in attrs.values(): if len(value.encode("utf8")) > 1024: # attribute exceeds value size limit of 1024 bytes # https://cloud.google.com/pubsub/quotas#resource_limits return response.text( "header too large\n", HTTP_STATUS.REQUEST_HEADER_FIELDS_TOO_LARGE ) try: await client.publish(topic, data, **attrs) except ValueError: return response.text("payload too large\n", HTTP_STATUS.PAYLOAD_TOO_LARGE) except Exception: # api call failure, write to queue logger.exception("pubsub unavailable") try: q.put((topic, data, attrs)) except DatabaseError: logger.exception("queue full") # sqlite queue is probably out of space return response.text("", HTTP_STATUS.INSUFFICIENT_STORAGE) return response.text("") def get_client(config: dict) -> PublisherClient: """Create a pubsub client.""" # Initialize PubSub client timeout = config.get("PUBLISH_TIMEOUT_SECONDS", None) client = PublisherClient() client.api.publish = partial( client.api.publish, retry=Retry(TRANSIENT_ERRORS, deadline=timeout), timeout=timeout, ) client._batch_class = AsyncioBatch return client def get_queue(config: dict) -> SQLiteAckQueue: """Create a SQLiteAckQueue. Use a SQLiteAckQueue because: * we use acks to ensure messages only removed on success * persist-queue's SQLite*Queue is faster than its Queue * SQLite provides thread-safe and process-safe access """ queue_config = { key[6:].lower(): value for key, value in config.items() if key.startswith("QUEUE_") } q: SQLiteAckQueue = SQLiteAckQueue(**queue_config, auto_resume=False) q.resume_unack_tasks() # work around https://github.com/peter-wangxu/persist-queue/pull/154 q.total = q._count() return q def init_app(app: Sanic) -> Tuple[PublisherClient, SQLiteAckQueue]: """Initialize Sanic app with url rules.""" client = get_client(app.config) q = get_queue(app.config) # get metadata_headers config metadata_headers = app.config["METADATA_HEADERS"] # validate attribute keys for attribute in metadata_headers.values(): if len(attribute.encode("utf8")) > 256: # https://cloud.google.com/pubsub/quotas#resource_limits raise ValueError("Metadata attribute exceeds key size limit of 256 bytes") # add routes for ROUTE_TABLE handlers: Dict[str, Callable] = {} for route in app.config["ROUTE_TABLE"]: if route.topic not in handlers: # generate one handler per topic handlers[route.topic] = partial( submit, client=client, q=q, topic=route.topic, metadata_headers=metadata_headers, ) handlers[route.topic].__name__ = f"submit({route.topic})" app.add_route( handler=handlers[route.topic], uri=route.uri, methods=[method.upper() for method in route.methods], ) return client, q