def valid_message_receive()

in cdsresponder/rabbitmq/UploadRequestedProcessor.py [0:0]


    def valid_message_receive(self, channel: pika.channel.Channel, exchange_name:str, routing_key:str, delivery_tag:str, body:dict):
        logger.info("Received upload request from {0} with key {1} and delivery tag {2}".format(exchange_name, routing_key, delivery_tag))

        if not self.validate_inmeta(body["inmeta"]):
            logger.error("inmeta term did not validate as an xml inmeta document: {0}".format(self.xsd_validator.error_log))
            logger.error("Offending content was {0}".format(body["inmeta"]))
            body["error"] = self.xsd_validator.error_log
            self.inform_job_status(channel, "invalid", body)
            raise MessageProcessor.NackMessage

        if "filename" in body and body["filename"] is not None:
            filename_hint = body["filename"]
        elif "online_id" in body and body["online_id"] is not None:
            filename_hint = body["online_id"]
        elif "nearline_id" in body and body["nearline_id"] is not None:
            filename_hint = body["nearline_id"]
        elif "archive_id" in body and body["archive_id"] is not None:
            filename_hint = body["archive_id"]
        else:
            filename_hint = self.randomstring(10)

        labels = {
            "deliverable-asset-id": str(body["deliverable_asset"]) if "deliverable_asset" in body else "None",
            "deliverable-bundle-id": str(body["deliverable_bundle"]) if "deliverable_bundle" in body else "None",
            "online-id": self.make_safe_label(str(body["online_id"])) if "online_id" in body else "None",
            "nearline-id": self.make_safe_label(str(body["nearline_id"])) if "nearline_id" in body else "None",
            "archive-id": self.make_safe_label(str(body["archive_id"])) if "archive_id" in body else "None",
        }

        inmeta_file = self.write_out_inmeta(self.launcher.sanitise_job_name(filename_hint), body["inmeta"])
        job_name = "cds-{0}-{1}".format(filename_hint, self.randomstring(4))
        try:
            result = self.launcher.launch_cds_job(inmeta_file, job_name, body["routename"], labels)
            body["job-id"] = result.metadata.uid
            body["job-name"] = result.metadata.name
            body["job-namespace"] = result.metadata.namespace
        except Exception as e:
            logger.error("Could not launch job for {0}: {1}".format(body, str(e)))
            os.remove(inmeta_file)
            try:
                body["job-name"] = job_name
                body["error"] = str(e)
                body["traceback"] = traceback.format_exc()
                self.inform_job_status(channel, "invalid", body)
            except Exception as e:
                logger.error("Could not inform exchange of job failure: {0}".format(e))
            raise MessageProcessor.NackMessage

        try:
            self.inform_job_status(channel, "started", body)
        except Exception as e:
            logger.error("Job started but could not inform exchange: {0}".format(e))
            raise MessageProcessor.NackMessage