rabbitmq/AtomResponderProcessor.py (106 lines of code) (raw):
from .MessageProcessor import MessageProcessor
from .serializers import AtomMessageSerializer
from .models import AtomResponderMessage
from gnm_deliverables.models import Deliverable, DeliverableAsset
from gnmvidispine.vs_item import VSItem
from django.conf import settings
import gnm_deliverables.choices as AssetChoices
import rabbitmq.constants as const
import logging
import pytz
from datetime import datetime
logger = logging.getLogger(__name__)
class AtomResponderProcessor(MessageProcessor):
routing_key = "atomresponder.atom.#"
serializer = AtomMessageSerializer
def get_or_create_bundle(self, projectid: str, commissionId:int) -> Deliverable:
"""
Looks up a deliverable bundle for the given (pluto-core) projectID. If none is found
then a new one is created for the given project and commission IDs
:param projectid: project ID to create a bundle for
:param commissionId: commission relating to that project. Only used when creating a new bundle.
:return: a Deliverable model instance
"""
try:
bundle = Deliverable.objects.get(pluto_core_project_id=int(projectid))
except ValueError:
logger.error("ProjectId {} is not a number. Maybe it's a Vidispine project?".format(projectid))
raise RuntimeError("Invalid project id")
except Deliverable.DoesNotExist:
bundle = Deliverable(
commission_id=commissionId,
pluto_core_project_id=projectid,
name="Deliverables for {}".format(projectid)
)
bundle.save()
return bundle
def get_or_create_unattached_bundle(self) -> Deliverable:
"""
Gets the special bundle for unattached projects, or creates it if it does not exist
:return: a Deliverable model instance
"""
try:
bundle = Deliverable.objects.get(pluto_core_project_id=-1)
except Deliverable.DoesNotExist:
bundle = Deliverable(
commission_id=-1,
pluto_core_project_id=-1,
name="Unattached media atom masters"
)
bundle.save()
return bundle
def get_or_create_record(self, atomid:str, projectid:str, commissionId:int) -> (DeliverableAsset, bool):
"""
tries to look up a pre-existing asset with the same atom id.
if that can't be found, tries to find a Deliverable bundle for the given project
if that can't be found, then it creates a deliverable bundle from the provided information and saves it to the db
once a bundle is available, it creates a new DeliverableAsset and returns it WITHOUT saving
:param atomid: the atom uuid
:param projectid: (string representation of) the numeric projectlocker project id
:param commissionId: the numeric commission id
:return: a DeliverableAsset
"""
try:
asset = DeliverableAsset.objects.get(atom_id=atomid)
logger.info("Found pre-existing asset id {} for atom id {}".format(asset.pk, atomid))
return asset, False
except DeliverableAsset.DoesNotExist:
pass
##if we get here, there is no DeliverableAsset attached to the given id
logger.info("No pre-existing asset for atom id {}, creating one".format(atomid))
if projectid is None:
bundle = self.get_or_create_unattached_bundle()
else:
bundle = self.get_or_create_bundle(projectid, commissionId)
timezone = pytz.timezone("UTC")
timestamp = timezone.localize(datetime.now()).isoformat()
asset = DeliverableAsset(
type=AssetChoices.DELIVERABLE_ASSET_TYPE_VIDEO_PUBLISHED_ATOM_FILE,
atom_id=atomid,
deliverable=bundle,
changed_dt=timestamp,
)
return asset, True
def reassign_project(self, asset:DeliverableAsset, projectId:str, commissionId:int):
logger.info("Reassigning project of {} to id {}".format(str(asset), projectId))
new_bundle = self.get_or_create_bundle(projectId, commissionId)
asset.deliverable = new_bundle
asset.save()
def set_vs_metadata(self, asset:DeliverableAsset):
if asset.online_item_id is None:
logger.info("Deliverable asset {} has no online item id, can't set metadata".format(str(asset)))
return
logger.info("Setting deliverables metadata on {0} for {1}".format(asset.online_item_id, str(asset)))
item = VSItem(url=settings.VIDISPINE_URL,user=settings.VIDISPINE_USER,passwd=settings.VIDISPINE_PASSWORD)
item.name = asset.online_item_id
builder = item.get_metadata_builder()
builder.addGroup(const.GROUP_GNM_DELIVERABLE, {
const.GNM_DELIVERABLE_ATOM_ID: asset.atom_id,
const.GNM_DELIVERABLE_BUNDLE: asset.deliverable.id,
const.GNM_DELIVERABLE_ID: asset.id
})
builder.commit()
def valid_message_receive(self, exchange_name, routing_key, delivery_tag, body):
"""
handles the incoming message
:param exchange_name:
:param routing_key:
:param delivery_tag:
:param body:
:return:
"""
msg = AtomResponderMessage(**body)
if msg.type == const.MESSAGE_TYPE_MEDIA or msg.type == const.MESSAGE_TYPE_RESYNC_MEDIA:
logger.info("Received notification of a master {0} at item {1}".format(msg.title, msg.itemId))
(asset, created) = self.get_or_create_record(msg.atomId, msg.projectId, msg.commissionId)
asset.online_item_id = msg.itemId
asset.job_id = msg.jobId ##once we save this value, we can process the notifications when the job completes
asset.size = msg.size
asset.filename = msg.title
if created:
asset.status = AssetChoices.DELIVERABLE_ASSET_STATUS_INGESTING #FIXME: it might not be this state?
asset.save()
if created:
try:
self.set_vs_metadata(asset)
except Exception as e:
logger.exception("Could not update Vidispne metadata on {}: ".format(asset.online_item_id), exc_info=e)
elif msg.type == const.MESSAGE_TYPE_PAC:
logger.info("PAC messages not implemented yet")
elif msg.type == const.MESSAGE_TYPE_PROJECT_ASSIGNED:
(asset, created) = self.get_or_create_record(msg.atomId, msg.projectId, msg.commissionId)
if created:
logger.warning("Strange, got a project re-assignment message for something that does not exist yet")
self.reassign_project(asset, msg.projectId, msg.commissionId)
else:
raise ValueError("Did not recognise message type {}".format(msg.type))