rabbitmq/AssetUpdatedProcessor.py (49 lines of code) (raw):

from .MessageProcessor import MessageProcessor from gnm_deliverables.models import DeliverableAsset from .time_funcs import get_current_time import logging import gnm_deliverables.choices as choices from datetime import datetime logger = logging.getLogger(__name__) class AssetUpdatedProcessor(MessageProcessor): """ this class processes messages from our own source, in order to seperate side-effects from main effects """ routing_key = "deliverables.deliverableasset.update" #there is something weird with the DeliverableAsset serializer, so fall back to standard jsonschema validation schema = { "type": "object", "properties": { "id": {"type": "number"}, "type": {"type": "number"}, "filename": {"type": "string"}, "status": {"type": "number"}, "deliverable": {"type":"number"} } } def valid_message_receive(self, exchange_name, routing_key, delivery_tag, body): # there is something strange with the serializer, if we just build an object from body we miss out # most of the fields. if "id" in body: asset = DeliverableAsset.objects.get(id=body["id"]) logger.debug("loaded asset {0}".format(asset.__dict__)) else: logger.error("Incoming message had no ID. Offending data was: {0}".format(body)) return if asset.ingest_complete_dt is not None: completed_ago = get_current_time() - asset.ingest_complete_dt else: completed_ago = None if asset.file_removed_dt is None: #if it happened more than 5 minutes ago then it's probably an old notification if completed_ago is not None and completed_ago.days>1: logger.warning(("Received notification that {name} (asset {assetid} on bundle {bundleid}) was " + "updated where the source file is not removed but ingest completed {ago} ago. " + "This may be a false notification, if not then delete the file {abspath} manually").format( name=str(asset), assetid=asset.id, bundleid=asset.deliverable_id, ago=completed_ago, abspath=asset.absolute_path, )) if asset.status==choices.DELIVERABLE_ASSET_STATUS_TRANSCODED: logger.info("{0} ({1}) completed ingest & transcode {2} ago; removing file if it exists".format(str(asset), asset.absolute_path, completed_ago)) asset.remove_file() elif asset.status==choices.DELIVERABLE_ASSET_STATUS_INGESTED: logger.info("{0} ({1}) completed ingest with possible ongoing transcode {2} ago; removing file if it exists".format(str(asset), asset.absolute_path, completed_ago)) asset.remove_file() else: logger.debug("{0} updated item was not in a completed state".format(str(asset)))