rabbitmq/VidispineMessageProcessor.py (83 lines of code) (raw):

from .MessageProcessor import MessageProcessor from .job_notification import JobNotification import logging from atomresponder.models import ImportJob from kinesisresponder.sentry import inform_sentry_exception from .transcode_check import check_for_broken_proxy, delete_existing_proxy, transcode_proxy from datetime import datetime import pytz from django.conf import settings logger = logging.getLogger(__name__) time_zone: str = getattr(settings,"TIME_ZONE", "UTC") class VidispineMessageProcessor(MessageProcessor): routing_key = "vidispine.job.essence_version.stop" # see https://json-schema.org/learn/miscellaneous-examples.html for more details schema = { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "properties": { "field": { "type": "array", "items": { "$ref": "#/definitions/kvpair" } } }, "definitions": { "kvpair": { "type": "object", "required": ["key","value"], "properties": { "key": {"type": "string"}, "value": {"type": "string"} } } } } def valid_message_receive(self, exchange_name, routing_key, delivery_tag, body: dict): """ receives the validated vidispine json message. :param exchange_name: :param routing_key: :param delivery_tag: :param body: :return: """ logger.debug("got incoming message: ", body) notification = JobNotification(body) importjob = ImportJob.objects.get(job_id=notification.jobId) importjob.status = notification.status importjob.processing = False importjob.completed_at = datetime.now(tz=pytz.timezone(time_zone)) importjob.save() if importjob.is_failed(): VidispineMessageProcessor.handle_failed_job(importjob) else: try: logger.info("{0}: Checking for broken proxy".format(importjob.item_id)) should_regen, shape_id = check_for_broken_proxy(importjob.item_id) if should_regen: logger.info("{0}: Proxy needs regen. Existing shape id is {1}".format(importjob.item_id, shape_id)) if shape_id is not None: logger.info("{0}: Deleting invalid proxy".format(importjob.item_id)) delete_existing_proxy(importjob.item_id, shape_id) transcode_proxy(importjob.item_id, "lowres") else: logger.info("{0}: Proxy is OK".format(importjob.item_id)) except Exception as e: logger.exception("{0}: Could not do proxy check: ", exc_info=e) inform_sentry_exception() importjob.save() @staticmethod def handle_failed_job(import_job): """ If the job failed, request a resend :param import_job: ImportJob model instance representing the failed job :return: """ from atomresponder.media_atom import request_atom_resend from kinesisresponder.sentry import inform_sentry_exception max_retries = getattr(settings, "MAX_IMPORT_RETRIES", 10) logger.info("{0} ({1}): failed on attempt {2}".format(import_job.item_id, import_job.atom_id, import_job.retry_number)) if import_job.retry_number > max_retries: logger.error("{0}: Have already retried {1} times, giving up".format(import_job.atom_id, import_job.retry_number)) import_job.completed_at = datetime.now(tz=pytz.timezone(time_zone)) import_job.status = "FAILED_TOTAL" import_job.processing = False import_job.save() return import_job.retry_number += 1 import_job.save() try: logger.info("Requesting resend of atom {0}".format(import_job.atom_id)) request_atom_resend(import_job.atom_id, settings.ATOM_TOOL_HOST, settings.ATOM_TOOL_SECRET) logger.info("Resend of atom {0} done".format(import_job.atom_id)) except Exception as e: logger.error(e) inform_sentry_exception()