rabbitmq/vidispine_message_processor.py (126 lines of code) (raw):
from .MessageProcessor import MessageProcessor
from .job_notification import JobNotification
import logging
from django.conf import settings
from gnmvidispine.vs_item import VSItem
from gnmvidispine.vidispine_api import VSException
from gnm_deliverables.choices import DELIVERABLE_ASSET_TYPES, \
DELIVERABLE_ASSET_STATUS_NOT_INGESTED, \
DELIVERABLE_ASSET_STATUS_INGESTED, \
DELIVERABLE_ASSET_STATUS_INGEST_FAILED, DELIVERABLE_ASSET_STATUS_INGESTING, \
DELIVERABLE_ASSET_STATUS_TRANSCODED, \
DELIVERABLE_ASSET_STATUS_TRANSCODE_FAILED, DELIVERABLE_ASSET_STATUS_TRANSCODING
from rabbitmq.time_funcs import get_current_time
from gnm_deliverables.models import *
logger = logging.getLogger(__name__)
class VidispineMessageProcessor(MessageProcessor):
routing_key = "vidispine.job.*.*"
# see https://json-schema.org/learn/miscellaneous-examples.html for more details
schema = {
"type": "object",
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"field": {
"type": "array",
"items": {
"$ref": "#/definitions/kvpair"
}
}
},
"definitions": {
"kvpair": {
"type": "object",
"required": ["key","value"],
"properties": {
"key": {"type": "string"},
"value": {"type": "string"}
}
}
}
}
DONT_TRANSCODE_THESE_TYPES = [DELIVERABLE_ASSET_TYPE_OTHER_MISCELLANEOUS,
DELIVERABLE_ASSET_TYPE_OTHER_PAC_FORMS,
DELIVERABLE_ASSET_TYPE_OTHER_POST_PRODUCTION_SCRIPT,
DELIVERABLE_ASSET_TYPE_OTHER_SUBTITLE
]
def get_item_metadata(self, item_id) -> (float, str):
duration_seconds=None
version=None
if item_id is not None:
try:
vs_item = VSItem(url=settings.VIDISPINE_URL,
user=settings.VIDISPINE_USER,
passwd=settings.VIDISPINE_PASSWORD)
vs_item.populate(item_id,specificFields=["durationSeconds","__version"])
version = vs_item.get("__version",allowArray=True)
if isinstance(version, list):
logger.warning("{0} has multiple versions: {1}, using the first".format(item_id, version))
version = version[0]
possibly_seconds = vs_item.get("durationSeconds")
if possibly_seconds is not None:
duration_seconds = float(possibly_seconds)
except ValueError:
logger.warning("{0}: duration_seconds value '{1}' could not be converted to float".format(item_id, vs_item.get("durationSeconds")))
except VSException as e:
logger.warning("Could not get extra metadata for {0} from Vidispine: {1}".format(item_id, str(e)))
if duration_seconds is not None:
logger.debug("Got duration as " + str(duration_seconds) + " seconds")
if version is not None:
logger.debug("Got version: " + version)
return duration_seconds, version
def valid_message_receive(self, exchange_name, routing_key, delivery_tag, body: dict):
"""
receives the validated vidispine json message.
:param exchange_name:
:param routing_key:
:param delivery_tag:
:param body:
:return:
"""
logger.debug("Got incoming message: " + str(body))
logger.debug("Exchange name: {0}".format(exchange_name))
logger.debug("Routing key: {0}".format(routing_key))
logger.debug("Delivery tag: {0}".format(delivery_tag))
try:
notification = JobNotification(body)
except Exception as e:
logger.warning("Incoming message lacked one or more required fields. {0}".format(e))
return
return self.handle_notification(notification, routing_key)
def handle_notification(self, notification:JobNotification, routing_key:str):
"""
takes a constructed JobNotification, works out what it means and performs the relevant actions
:param notification: JobNotification instance
:param routing_key: routing key with which this was sent
:return: none
"""
try:
asset = DeliverableAsset.objects.get(job_id=notification.jobId)
except DeliverableAsset.DoesNotExist:
logger.warning("Received a message for job {0}. Cannot find a matching asset.".format(notification.jobId))
return
if notification.status in ['FAILED_TOTAL', 'ABORTED_PENDING', 'ABORTED']:
if notification.type == 'TRANSCODE':
asset.status = DELIVERABLE_ASSET_STATUS_TRANSCODE_FAILED
else:
asset.status = DELIVERABLE_ASSET_STATUS_INGEST_FAILED
asset.ingest_complete_dt = get_current_time()
asset.save()
elif notification.status == 'READY' or notification.status == 'STARTED' or notification.status == 'VIDINET_JOB':
if notification.type == 'TRANSCODE':
asset.status = DELIVERABLE_ASSET_STATUS_TRANSCODING
else:
asset.status = DELIVERABLE_ASSET_STATUS_INGESTING
asset.save()
elif notification.status in ['FINISHED','FINISHED_WARNING']:
if notification.type == 'TRANSCODE':
asset.status = DELIVERABLE_ASSET_STATUS_TRANSCODED
duration_seconds, version = self.get_item_metadata(notification.itemId)
try:
asset.version = int(version)
except ValueError as e:
logger.warning("{0}: asset version '{1}' could not be converted into number".format(notification.itemId, version))
asset.duration_seconds = duration_seconds
asset.ingest_complete_dt = get_current_time()
else:
asset.online_item_id = notification.itemId
if asset.type in self.DONT_TRANSCODE_THESE_TYPES:
asset.status = DELIVERABLE_ASSET_STATUS_TRANSCODED
else:
asset.status = DELIVERABLE_ASSET_STATUS_INGESTED
if routing_key == 'vidispine.job.essence_version.stop':
try:
asset.create_proxy()
except Exception as e:
logger.exception(
"{0} for asset {1} in bundle {2}: could not create proxy due to:".format(
asset.online_item_id,
asset.id,
asset.deliverable.id),
exc_info=e)
asset.save()