in cdsresponder/rabbitmq/UploadRequestedProcessor.py [0:0]
def valid_message_receive(self, channel: pika.channel.Channel, exchange_name:str, routing_key:str, delivery_tag:str, body:dict):
logger.info("Received upload request from {0} with key {1} and delivery tag {2}".format(exchange_name, routing_key, delivery_tag))
if not self.validate_inmeta(body["inmeta"]):
logger.error("inmeta term did not validate as an xml inmeta document: {0}".format(self.xsd_validator.error_log))
logger.error("Offending content was {0}".format(body["inmeta"]))
body["error"] = self.xsd_validator.error_log
self.inform_job_status(channel, "invalid", body)
raise MessageProcessor.NackMessage
if "filename" in body and body["filename"] is not None:
filename_hint = body["filename"]
elif "online_id" in body and body["online_id"] is not None:
filename_hint = body["online_id"]
elif "nearline_id" in body and body["nearline_id"] is not None:
filename_hint = body["nearline_id"]
elif "archive_id" in body and body["archive_id"] is not None:
filename_hint = body["archive_id"]
else:
filename_hint = self.randomstring(10)
labels = {
"deliverable-asset-id": str(body["deliverable_asset"]) if "deliverable_asset" in body else "None",
"deliverable-bundle-id": str(body["deliverable_bundle"]) if "deliverable_bundle" in body else "None",
"online-id": self.make_safe_label(str(body["online_id"])) if "online_id" in body else "None",
"nearline-id": self.make_safe_label(str(body["nearline_id"])) if "nearline_id" in body else "None",
"archive-id": self.make_safe_label(str(body["archive_id"])) if "archive_id" in body else "None",
}
inmeta_file = self.write_out_inmeta(self.launcher.sanitise_job_name(filename_hint), body["inmeta"])
job_name = "cds-{0}-{1}".format(filename_hint, self.randomstring(4))
try:
result = self.launcher.launch_cds_job(inmeta_file, job_name, body["routename"], labels)
body["job-id"] = result.metadata.uid
body["job-name"] = result.metadata.name
body["job-namespace"] = result.metadata.namespace
except Exception as e:
logger.error("Could not launch job for {0}: {1}".format(body, str(e)))
os.remove(inmeta_file)
try:
body["job-name"] = job_name
body["error"] = str(e)
body["traceback"] = traceback.format_exc()
self.inform_job_status(channel, "invalid", body)
except Exception as e:
logger.error("Could not inform exchange of job failure: {0}".format(e))
raise MessageProcessor.NackMessage
try:
self.inform_job_status(channel, "started", body)
except Exception as e:
logger.error("Job started but could not inform exchange: {0}".format(e))
raise MessageProcessor.NackMessage