atomresponder/master_importer.py (260 lines of code) (raw):

from kinesisresponder.kinesis_responder import KinesisResponder import json import urllib.request, urllib.parse, urllib.error from django.conf import settings from .s3_mixin import S3Mixin, FileDoesNotExist from .vs_mixin import VSMixin import logging from gnmvidispine.vs_item import VSItem, VSNotFound from rabbitmq.models import LinkedProject from datetime import datetime import atomresponder.constants as const import re import pika import time import os import atomresponder.message_schema as message_schema import jsonschema logger = logging.getLogger(__name__) #Still need: holding image. this is more likely to come from the launch detection side than the atom side. extract_extension = re.compile(r'^(?P<basename>.*)\.(?P<extension>[^\.]+)$') multiple_underscore_re = re.compile(r'_{2,}') make_filename_re = re.compile(r'[^\w\d\.]') class MasterImportResponder(KinesisResponder, S3Mixin, VSMixin): def __init__(self, *args, **kwargs): super(MasterImportResponder, self).__init__(*args, **kwargs) self._pika_client = None #set up exchange on startup. this also means we terminate if we can't connect to the broker. if "CI" not in os.environ: (conn, channel) = self.setup_pika_channel() channel.close() conn.close() @staticmethod def setup_pika_channel() -> (pika.spec.Connection, pika.spec.Channel): conn = pika.BlockingConnection(pika.ConnectionParameters( host=settings.RABBITMQ_HOST, port=getattr(settings, "RABBITMQ_PORT", 5672), virtual_host=getattr(settings, "RABBITMQ_VHOST", "/"), credentials=pika.PlainCredentials(username=settings.RABBITMQ_USER, password=settings.RABBITMQ_PASSWORD), connection_attempts=getattr(settings, "RABBITMQ_CONNECTION_ATTEMPTS", 3), retry_delay=getattr(settings, "RABBITMQ_RETRY_DELAY", 3) )) channel = conn.channel() channel.exchange_declare(settings.RABBITMQ_EXCHANGE, exchange_type="topic", durable=True) channel.confirm_delivery() return conn, channel def update_pluto_record(self, item_id, job_id, content:dict, statinfo): (conn, channel) = self.setup_pika_channel() if 'type' not in content: logger.error("Content dictionary had no type information! Using video-upload") type = const.MESSAGE_TYPE_MEDIA else: type = content['type'] routingkey = "atomresponder.atom.{}".format(type) #don't bother with a lookup if there is no project ID if "projectId" in content and content["projectId"] is not None: try: linked_project = LinkedProject.objects.get(project_id=int(content['projectId'])) commission_id = linked_project.commission.id except ValueError: logger.error("Project ID {} does not convert to integer!".format(content['projectId'])) commission_id = -1 except KeyError: logger.error("Content has no projectId? invalid message.") raise RuntimeError("Invalid message") except LinkedProject.DoesNotExist: commission_id = -1 else: commission_id = None if statinfo is not None: statpart = { "size": statinfo.st_size, "atime": statinfo.st_atime, "mtime": statinfo.st_mtime, "ctime": statinfo.st_ctime } else: statpart = {} message_to_send = { **content, "itemId": item_id, "jobId": job_id, "commissionId": commission_id, **statpart } while True: try: logger.info("Updating exchange {} with routing-key {}...".format(settings.RABBITMQ_EXCHANGE, routingkey)) channel.basic_publish(settings.RABBITMQ_EXCHANGE, routingkey, json.dumps(message_to_send).encode("UTF-8")) channel.close() conn.close() #makes sure everything gets flushed nicely break except pika.exceptions.UnroutableError: logger.error("Could not route message to broker, retrying in 3s...") time.sleep(3) def get_or_create_master_item(self, atomId:str, title:str, filename:str, project_id:int, user:str) -> (VSItem, bool): master_item = self.get_item_for_atomid(atomId) created = False if master_item is None: if title is None: raise ValueError("Title field not set for atom {0}.".format(atomId)) if user is None: logger.warning("User field not set for atom {0}.".format(atomId)) user_to_set="unknown_user" else: user_to_set=user master_item = self.create_placeholder_for_atomid(atomId, filename=filename, title=title, project_id=project_id, user=user_to_set) logger.info("Created item {0} for atom {1}".format(master_item.name, atomId)) created = True return master_item, created def process(self,record, approx_arrival, attempt=0): """ Process a message from the kinesis stream. Each record is a JSON document which contains keys for atomId, s3Key, projectId. This will find an item with the given atom ID or create a new one, get a signed download URL from S3 for the media and then instruct Vidsipine to import it. Rather than wait for the job to complete here, we return immediately and rely on receiving a message from VS when the job terminates. :param record: JSON document in the form of a string :param approx_arrival: :param attempt: optional integer showing how many times this has been retried :return: """ from .media_atom import request_atom_resend, HttpError content = json.loads(record) try: message_schema.validate_message(content) except ValueError as e: logger.error("Incoming message \'{}\' was not valid: {}".format(record, e)) return except jsonschema.ValidationError as e: logger.error("Incoming message \'{}\' was not valid: {}".format(record, e)) return logger.info("Valid message of type {}: {}".format(content["type"],content)) #We get two types of message on the stream, one for incoming xml the other for incoming media. if content['type'] == const.MESSAGE_TYPE_MEDIA or content['type'] == const.MESSAGE_TYPE_RESYNC_MEDIA: if 'user' in content: atom_user = content['user'] else: atom_user = None if "projectId" in content and content["projectId"] is not None: project_id = content["projectId"] else: project_id = None master_item, created = self.get_or_create_master_item(content['atomId'], title=content['title'], filename=content['s3Key'], project_id=project_id, user=atom_user) return self.import_new_item(master_item, content) elif content['type'] == const.MESSAGE_TYPE_PAC: logger.info("Got PAC form data message") record = self.register_pac_xml(content) self.ingest_pac_xml(record) logger.info("PAC form data message complete") elif content['type'] == const.MESSAGE_TYPE_PROJECT_ASSIGNED: logger.info("Got project (re-)assignment message: {0}".format(content)) master_item = self.get_item_for_atomid(content['atomId']) if master_item is not None: logger.info("Master item for atom already exists at {0}, assigning".format(master_item.name)) self.update_pluto_record(master_item.name, None, content, None) else: logger.warning("No master item exists for atom {0}. Requesting a re-send from media atom tool".format(content['atomId'])) try: request_atom_resend(content['atomId'], settings.ATOM_TOOL_HOST, settings.ATOM_TOOL_SECRET) except HttpError as e: if e.code == 404: if attempt >= 10: logger.error("{0}: still nothing after 10 attempts. Giving up.".format(content['atomId'])) raise logger.warning("{0}: Media atom tool responded with a 404 on attempt {1}: {2}. Retry is NOT YET IMPLEMENTED.".format(content['atomId'], attempt, e.content)) # timed_request_resend.apply_async(args=(record, approx_arrival), # kwargs={'attempt': attempt+1}, # countdown=60) else: logger.exception("{0}: Could not request resync".format(content['atomId'])) logger.info("Project (re-)assignment complete") else: raise ValueError("Unrecognised message type: {0}".format(content['type'])) def register_pac_xml(self, content): """ Start the import of new PAC data by registering it in the database. :param content: JSON message content as received from atom tool :return: the database model instance """ from .models import PacFormXml (record, created) = PacFormXml.objects.get_or_create(atom_id=content['atomId'], defaults={'received': datetime.now()}) if not created: logger.info("PAC form xml had already been delivered for {0}, over-writing".format(content['atomId'])) record.completed = None record.status = "UNPROCESSED" record.last_error = "" record.pacdata_url = "s3://{bucket}/{path}".format(bucket=content['s3Bucket'], path=content['s3Path']) record.save() return record def check_for_old_finished_jobs(self, vs_item_id): from .models import ImportJob jobs = ImportJob.objects.filter(item_id=vs_item_id).filter(status='FINISHED').count() return jobs > 0 def check_key(self, key, vs_item_id): from .models import ImportJob jobs = ImportJob.objects.filter(item_id=vs_item_id).filter(s3_path=key).count() return jobs > 0 def check_for_processing(self, vs_item_id): from .models import ImportJob jobs = ImportJob.objects.filter(item_id=vs_item_id).filter(processing=True).count() return jobs > 0 def import_new_item(self, master_item:VSItem, content): from .models import ImportJob, PacFormXml from .pac_xml import PacXmlProcessor from mock import MagicMock if not isinstance(master_item, VSItem) and not isinstance(master_item, MagicMock): raise TypeError #for intellij from kinesisresponder.sentry import inform_sentry vs_item_id = master_item.get("itemId") if vs_item_id is None: vs_item_id = master_item.name old_finished_jobs = self.check_for_old_finished_jobs(vs_item_id) old_key = self.check_key(content['s3Key'], vs_item_id) if old_finished_jobs is True and old_key is True: logger.info('A job for item {0} has already been successfully completed. Aborting.'.format(vs_item_id)) inform_sentry('A job for item {0} has already been successfully completed. Aborting.'.format(vs_item_id), { "master_item": master_item, "content": content, }) return processing_job = self.check_for_processing(vs_item_id) if processing_job is True: logger.info('Job for item {0} already in progress. Aborting.'.format(vs_item_id)) inform_sentry('Job for item {0} already in progress. Aborting.'.format(vs_item_id), { "master_item": master_item, "content": content, }) return safe_title = content.get('title','(unknown title)').encode("UTF-8","backslashescape").decode("UTF-8") #using a signed URL is preferred, but right now VS seems to have trouble ingesting it. #so, we download instead and ingest that. get_s3_signed_url is left in to make it simple to switch back #download_url = self.get_s3_signed_url(bucket=settings.ATOM_RESPONDER_DOWNLOAD_BUCKET, key=content['s3Key']) downloaded_path = self.download_to_local_location(bucket=settings.ATOM_RESPONDER_DOWNLOAD_BUCKET, key=content['s3Key'], #this is converted to a safe filename within download_to_local_location filename=content.get('title', None)) #filename=None => use s3key instead download_url = "file://" + urllib.parse.quote(downloaded_path) logger.info("{n}: Ingesting atom with title '{0}' from media atom with ID {1}".format(safe_title, content['atomId'], n=master_item.name)) logger.info("{n}: Download URL is {0}".format(download_url, n=master_item.name)) job_result = master_item.import_to_shape(uri=download_url, essence=True, shape_tag=getattr(settings,"ATOM_RESPONDER_SHAPE_TAG","lowres"), priority=getattr(settings,"ATOM_RESPONDER_IMPORT_PRIORITY","HIGH"), jobMetadata={'gnm_source': 'media_atom'}, ) logger.info("{0} Import job is at ID {1}".format(vs_item_id, job_result.name)) #make a note of the record. This is to link it up with Vidispine's response message. record = ImportJob(item_id=vs_item_id, job_id=job_result.name, status='STARTED', started_at=datetime.now(), user_email=content.get('user',"Unknown user"), atom_id=content['atomId'], atom_title=content.get('title', "Unknown title"), s3_path=content['s3Key'], processing=True) previous_attempt = record.previous_attempt() if previous_attempt: record.retry_number = previous_attempt.retry_number+1 logger.info("{0} Import job is retry number {1}".format(vs_item_id, record.retry_number)) record.save() statinfo = os.stat(downloaded_path) self.update_pluto_record(vs_item_id, job_result.name, content, statinfo) try: logger.info("{n}: Looking for PAC info that has been already registered".format(n=vs_item_id)) pac_entry = PacFormXml.objects.get(atom_id=content['atomId']) logger.info("{n}: Found PAC form information at {0}".format(pac_entry.pacdata_url,n=vs_item_id)) proc = PacXmlProcessor(self.role_name, self.session_name) proc.link_to_item(pac_entry, master_item) except PacFormXml.DoesNotExist: logger.info("{n}: No PAC form information has yet arrived".format(n=vs_item_id)) def ingest_pac_xml(self, pac_xml_record): """ Master process to perform import of pac data :param pac_xml_record: instance of PacFormXml model :return: """ from .pac_xml import PacXmlProcessor vsitem = self.get_item_for_atomid(pac_xml_record.atom_id) if vsitem is None: logger.warning("No item could be found for atom ID {0}, waiting for it to arrive".format(pac_xml_record.atom_id)) return proc = PacXmlProcessor(self.role_name,self.session_name) #this process will call out to Pluto to do the linkup once the data has been received return proc.link_to_item(pac_xml_record, vsitem)