in atomresponder/master_importer.py [0:0]
def update_pluto_record(self, item_id, job_id, content:dict, statinfo):
(conn, channel) = self.setup_pika_channel()
if 'type' not in content:
logger.error("Content dictionary had no type information! Using video-upload")
type = const.MESSAGE_TYPE_MEDIA
else:
type = content['type']
routingkey = "atomresponder.atom.{}".format(type)
#don't bother with a lookup if there is no project ID
if "projectId" in content and content["projectId"] is not None:
try:
linked_project = LinkedProject.objects.get(project_id=int(content['projectId']))
commission_id = linked_project.commission.id
except ValueError:
logger.error("Project ID {} does not convert to integer!".format(content['projectId']))
commission_id = -1
except KeyError:
logger.error("Content has no projectId? invalid message.")
raise RuntimeError("Invalid message")
except LinkedProject.DoesNotExist:
commission_id = -1
else:
commission_id = None
if statinfo is not None:
statpart = {
"size": statinfo.st_size,
"atime": statinfo.st_atime,
"mtime": statinfo.st_mtime,
"ctime": statinfo.st_ctime
}
else:
statpart = {}
message_to_send = {
**content,
"itemId": item_id,
"jobId": job_id,
"commissionId": commission_id,
**statpart
}
while True:
try:
logger.info("Updating exchange {} with routing-key {}...".format(settings.RABBITMQ_EXCHANGE, routingkey))
channel.basic_publish(settings.RABBITMQ_EXCHANGE, routingkey, json.dumps(message_to_send).encode("UTF-8"))
channel.close()
conn.close() #makes sure everything gets flushed nicely
break
except pika.exceptions.UnroutableError:
logger.error("Could not route message to broker, retrying in 3s...")
time.sleep(3)