def notify()

in cdsreaper/messagesender.py [0:0]


    def notify(self, routing_key: str, msg_content: dict, attempt=1)->bool:
        """
        send the given message (formatted to json) to the given routing key on the exchange configured at construction.
        this will wait for a delivery confirmation to be received from the broker before returning.
        This can raise a JSON encoding exception if the message is not serializable, or a RuntimeError if the sending retries have
        been exceeded
        :param routing_key:
        :param msg_content:
        :param attempt: don't set this, it is used internally as a retry counter
        :return: boolean indicating if the message was sent or not. Assume unrecoverable error if false.
        """
        error_exit = False
        try:
            logger.debug("Sending {0} via {1} to {2}".format(msg_content, routing_key, self.exchange))
            string_content = json.dumps(msg_content)
            self._channel.basic_publish(self.exchange, routing_key, string_content.encode(encoding="UTF-8"))
            return True
        except pika.exceptions.BodyTooLongError as e:
            logger.error("Could not send message {0} as the body is too long for the server".format(msg_content))
            return False

        except pika.exceptions.UnroutableError as e:
            if attempt >= self.max_retry_attempts:
                logger.error("Could not deliver message after {0} attempts: {1}, exiting".format(attempt, str(e)))
                error_exit = True
            else:
                retry_delay = 5*attempt
                logger.error("Could not send message on attempt {0}: {1}. Retrying in {2} seconds".format(attempt, str(e), retry_delay))
                time.sleep(retry_delay)
                return self.notify(routing_key, msg_content, attempt+1)
        except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPHeartbeatTimeout) as e:
            if attempt >= self.max_retry_attempts:
                logger.error("Could not deliver message after {0} attempts: {1}, exiting".format(attempt, str(e)))
                error_exit = True
            else:
                logger.error("Connection error: {0}. Attempting to re-open....".format(str(e)))
                self._setup_channel()
                return self.notify(routing_key, msg_content, attempt+1)

        if error_exit:  #avoid ugly "exception handling this exception" messages
            raise RuntimeError("Could not deliver message after {0} retries".format(self.max_retry_attempts))