src/alerter/rabbitmq_admin.py (56 lines of code) (raw):

import requests import requests.auth import logging import time from typing import Optional from urllib.parse import urlparse logger = logging.getLogger(__name__) logger.level = logging.INFO class RabbitMQConfig(object): def __init__(self, host: str, port: Optional[int], vhost:str, ssl:bool, credentials: requests.auth.AuthBase): self.host = host if port is None: self.port = 15672 else: self.port = port self.vhost = vhost self.ssl = ssl self.credentials = credentials def make_url(self, section:str, url_tail:str) -> str: if self.ssl: proto = "https" else: proto = "http" return "{0}://{1}:{2}/api/{3}/{4}/{5}".format(proto, self.host, self.port, section, self.vhost, url_tail) @staticmethod def load_from_files(filepath: str, want_ssl: bool): """ Initialises RabbitMQ config from text files in a directory. This is intended to be used from Kubernetes, where we can mount a secrets manifest as a directory path and read its contents from files. :param filepath: directory to load the files from. An IOError will be raised if any of the expected files do not exist here. :param want_ssl: if this is set, then use HTTPS for accessing the management port (recommended!) :return: a RabbitMQConfig object. Raises an exception if there is an error. """ with open(filepath + "/rabbitmq_client_uri", "r") as f: content = f.read() parsed_url = urlparse(content) if parsed_url.scheme != "amqp": logger.error("Expected rabbitmq uri scheme to be 'amqp' but got {0}".format(parsed_url.scheme)) raise ValueError("scheme is incorrect") if parsed_url.path == "": logger.error("Expected a virtualhost in the rabbitmq uri at {0}".format(filepath)) raise ValueError("no virtualhost for rabbitmq") # we don't use the port from the parsed url as that is for the AMQP port not the management port return RabbitMQConfig(parsed_url.hostname, None, parsed_url.path.lstrip("/ "), want_ssl, requests.auth.HTTPBasicAuth(parsed_url.username, parsed_url.password) ) def get_queued_message_count(queue_name: str, config: RabbitMQConfig, retry_limit=10, retry_counter=0) -> int: """ requests the current message count for the given queue from the server. :param queue_name: name of the queue to request :param config: RabbitMQConfig :param retry_limit: maximum number of times to retry, defaults to 10 :param retry_counter: current retry counter. Don't specify this when calling :return: the queued message count. Throws an exception on failure and will retry on transient errors """ if retry_counter > retry_limit: logger.error("Could not succeed after {0} attempts, giving up".format(retry_counter)) raise RuntimeError("Could not contact RabbitMQ and exhausted retries") dest_url = config.make_url("queues", queue_name) logger.debug("Making request to {0}".format(dest_url)) response = requests.get(dest_url, auth=config.credentials) if response.status_code == 200: content = response.json() return content["messages_ready"] elif response.status_code == 503 or response.status_code == 504: logger.warning("Rabbit MQ is not available at the moment ({0} returned 503). Retrying in a few seconds...") time.sleep(3) return get_queued_message_count(queue_name, config, retry_limit, retry_counter+1) else: logger.error("Could not contact RabbitMQ at {0}, server returned {1} {2}".format(dest_url, response.status_code, response.text)) raise RuntimeError("RabbitMQ returned an error, see the logs for details")