cdsresponder/rabbitmq/UploadRequestedProcessor.py (160 lines of code) (raw):
from .messageprocessor import MessageProcessor
import logging
import lxml.etree as xml
import os
import pathlib
import random
import string
import pika
import traceback
import re
logger = logging.getLogger(__name__)
class UploadRequestedProcessor(MessageProcessor):
my_exchange = "cdsresponder"
routing_key = "deliverables.syndication.*.upload"
schema = {
"type": "object",
"properties": {
"deliverable_asset": {
"type": "integer"
},
"deliverable_bundle": {
"type": "integer"
},
"filename": {
"type": ["string","null"]
},
"online_id": {
"type": ["string","null"]
},
"nearline_id": {
"type": ["string","null"]
},
"archive_id": {
"type": ["string","null"]
},
"inmeta": {
"type": "string"
},
"routename": {
"type": "string"
}
},
"required": ["inmeta","routename"]
}
def __init__(self):
from cds.cds_launcher import CDSLauncher #imported here so that it can be patched out during testing
self.xsd_validator = xml.XMLSchema(file=UploadRequestedProcessor.find_inmeta_xsd())
self.launcher = CDSLauncher(os.getenv("NAMESPACE")) #NAMESPACE arg is only used if we are not in-cluster
@staticmethod
def find_inmeta_xsd():
from_config = os.getenv("INMETA_XSD")
if from_config is not None:
return from_config
return os.path.join(
pathlib.Path(__file__).parent.parent.absolute(),
"inmeta.xsd"
)
def validate_inmeta(self, content:str)->bool:
try:
parsed_xml = xml.fromstring(content)
return self.xsd_validator.validate(parsed_xml)
except xml.ParseError as e:
logger.error("Incoming inmeta data did not parse as XML: {0}".format(str(e)))
return False
def build_filename(self, path:str, filename_hint:str)->str:
initial_filename = os.path.join(path, filename_hint + ".inmeta")
if not os.path.exists(initial_filename):
return initial_filename
i=1
while True:
test_filename = os.path.join(path, filename_hint + "-" + str(i) + ".inmeta")
if not os.path.exists(test_filename):
return test_filename
i+=1
if i>=1000:
logger.error("Reached 1,000 iterations and the file {0} still exists, something must have gone wrong".format(test_filename))
raise RuntimeError("Could not build target filename")
def write_out_inmeta(self, filename_hint:str, content:str)->str:
basepath = os.getenv("INMETA_PATH")
if basepath is None:
logger.error("INMETA_PATH is not set, can't output content")
raise RuntimeError("INMETA_PATH was not set")
without_extensions = filename_hint.split(".")
if len(without_extensions)==0:
logger.error("Incoming filename '{0}' appears blank".format(filename_hint))
raise RuntimeError("Could not build target filename")
target_filename = self.build_filename(basepath, without_extensions[0])
logger.info("Writing inmeta content to {0}".format(filename_hint))
with open(target_filename, "w") as f:
f.write(content)
return target_filename
@staticmethod
def randomstring(length:int)->str:
letters = string.ascii_letters + string.digits
return "".join(random.choice(letters) for i in range(length))
def inform_job_status(self, channel: pika.channel.Channel, status: str, body: dict):
import json
channel.basic_publish(
exchange=self.my_exchange,
routing_key="cds.job.{0}".format(status),
body=json.dumps(body),
mandatory=True
)
sanitizer = re.compile(r'[^A-Za-z0-9\-_.]')
@staticmethod
def make_safe_label(content:str)->str:
"""
kubernetes labels can't be longer than 63 chars or contain anything non-alphanumeric, _ - and .
:param content: string to sanitise
:return: sanitised string
"""
sanitised = UploadRequestedProcessor.sanitizer.sub("", content)
if len(sanitised)<63:
return sanitised
else:
abbreviated=sanitised[0:60]
return abbreviated+"..."
def valid_message_receive(self, channel: pika.channel.Channel, exchange_name:str, routing_key:str, delivery_tag:str, body:dict):
logger.info("Received upload request from {0} with key {1} and delivery tag {2}".format(exchange_name, routing_key, delivery_tag))
if not self.validate_inmeta(body["inmeta"]):
logger.error("inmeta term did not validate as an xml inmeta document: {0}".format(self.xsd_validator.error_log))
logger.error("Offending content was {0}".format(body["inmeta"]))
body["error"] = self.xsd_validator.error_log
self.inform_job_status(channel, "invalid", body)
raise MessageProcessor.NackMessage
if "filename" in body and body["filename"] is not None:
filename_hint = body["filename"]
elif "online_id" in body and body["online_id"] is not None:
filename_hint = body["online_id"]
elif "nearline_id" in body and body["nearline_id"] is not None:
filename_hint = body["nearline_id"]
elif "archive_id" in body and body["archive_id"] is not None:
filename_hint = body["archive_id"]
else:
filename_hint = self.randomstring(10)
labels = {
"deliverable-asset-id": str(body["deliverable_asset"]) if "deliverable_asset" in body else "None",
"deliverable-bundle-id": str(body["deliverable_bundle"]) if "deliverable_bundle" in body else "None",
"online-id": self.make_safe_label(str(body["online_id"])) if "online_id" in body else "None",
"nearline-id": self.make_safe_label(str(body["nearline_id"])) if "nearline_id" in body else "None",
"archive-id": self.make_safe_label(str(body["archive_id"])) if "archive_id" in body else "None",
}
inmeta_file = self.write_out_inmeta(self.launcher.sanitise_job_name(filename_hint), body["inmeta"])
job_name = "cds-{0}-{1}".format(filename_hint, self.randomstring(4))
try:
result = self.launcher.launch_cds_job(inmeta_file, job_name, body["routename"], labels)
body["job-id"] = result.metadata.uid
body["job-name"] = result.metadata.name
body["job-namespace"] = result.metadata.namespace
except Exception as e:
logger.error("Could not launch job for {0}: {1}".format(body, str(e)))
os.remove(inmeta_file)
try:
body["job-name"] = job_name
body["error"] = str(e)
body["traceback"] = traceback.format_exc()
self.inform_job_status(channel, "invalid", body)
except Exception as e:
logger.error("Could not inform exchange of job failure: {0}".format(e))
raise MessageProcessor.NackMessage
try:
self.inform_job_status(channel, "started", body)
except Exception as e:
logger.error("Job started but could not inform exchange: {0}".format(e))
raise MessageProcessor.NackMessage