in cdsreaper/messagesender.py [0:0]
def notify(self, routing_key: str, msg_content: dict, attempt=1)->bool:
"""
send the given message (formatted to json) to the given routing key on the exchange configured at construction.
this will wait for a delivery confirmation to be received from the broker before returning.
This can raise a JSON encoding exception if the message is not serializable, or a RuntimeError if the sending retries have
been exceeded
:param routing_key:
:param msg_content:
:param attempt: don't set this, it is used internally as a retry counter
:return: boolean indicating if the message was sent or not. Assume unrecoverable error if false.
"""
error_exit = False
try:
logger.debug("Sending {0} via {1} to {2}".format(msg_content, routing_key, self.exchange))
string_content = json.dumps(msg_content)
self._channel.basic_publish(self.exchange, routing_key, string_content.encode(encoding="UTF-8"))
return True
except pika.exceptions.BodyTooLongError as e:
logger.error("Could not send message {0} as the body is too long for the server".format(msg_content))
return False
except pika.exceptions.UnroutableError as e:
if attempt >= self.max_retry_attempts:
logger.error("Could not deliver message after {0} attempts: {1}, exiting".format(attempt, str(e)))
error_exit = True
else:
retry_delay = 5*attempt
logger.error("Could not send message on attempt {0}: {1}. Retrying in {2} seconds".format(attempt, str(e), retry_delay))
time.sleep(retry_delay)
return self.notify(routing_key, msg_content, attempt+1)
except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPHeartbeatTimeout) as e:
if attempt >= self.max_retry_attempts:
logger.error("Could not deliver message after {0} attempts: {1}, exiting".format(attempt, str(e)))
error_exit = True
else:
logger.error("Connection error: {0}. Attempting to re-open....".format(str(e)))
self._setup_channel()
return self.notify(routing_key, msg_content, attempt+1)
if error_exit: #avoid ugly "exception handling this exception" messages
raise RuntimeError("Could not deliver message after {0} retries".format(self.max_retry_attempts))