gnm_deliverables/management/commands/import_master_migration_data.py (110 lines of code) (raw):

from django.core.management.base import BaseCommand import yaml import logging from pprint import pprint from gnm_deliverables.models import * import re logger = logging.getLogger(__name__) class Command(BaseCommand): """ management command to import a yaml dump from the master migration process """ help = "import a yaml dump from the master migration process" def add_arguments(self, parser): parser.add_argument("path",type=str,help="path to the yaml file to import") parser.add_argument("--commissions",type=str,help="path to a yaml file containing project_id->commission_id relations") def load_content(self, fromFile:str) -> list: with open(fromFile, "r") as f: loaded_content = yaml.safe_load(f.read()) if not isinstance(loaded_content, list): pprint(loaded_content) raise TypeError("File content should be a list, not a {}".format(loaded_content.__class__.__name__)) return loaded_content def load_commissions_table(self, fromFile:str) -> dict: with open(fromFile, "r") as f: loaded_content = yaml.safe_load(f.read()) if not isinstance(loaded_content, dict): pprint(loaded_content) raise TypeError("File content should be an object, not a {}".format(loaded_content.__class__.__name__)) return loaded_content def get_parent_bundle(self, parent_project_id:int, commissions_table:dict) -> (Deliverable, bool): try: return Deliverable.objects.get_or_create(pluto_core_project_id=parent_project_id, defaults={ "commission_id": commissions_table.get(parent_project_id, 3657), "pluto_core_project_id": parent_project_id, "name": "Legacy masters for {}".format(parent_project_id), }) except Deliverable.MultipleObjectsReturned: results = Deliverable.objects.filter(pluto_core_project_id=parent_project_id) return results[0], False numbers_regex = re.compile(r'^\d+$') @staticmethod def numbers_only(value:str) -> str: if Command.numbers_regex.match(value): return value else: return "" @staticmethod def numbers_only_list(entries:list): mapped_entries = [Command.numbers_only(x) for x in entries] return list(filter(lambda entry: len(entry)>0,mapped_entries)) def handle(self, *args, **options): commissions_table = self.load_commissions_table(options["commissions"]) logger.info("Loaded {} project->commission relations from {}".format(len(commissions_table), options["commissions"])) content = self.load_content(options["path"]) logger.info("Loaded {} items from {}".format(len(content), options["path"])) for entry in content: parent_project_id = entry["deliverable"] del entry["deliverable"] #remove blank entries from some fields - e.g. date-time and uuids, things that need parsing. #go returns a blank when we actually want a null for name in ["access_dt", "modified_dt", "changed_dt", "job_id", "ingest_complete_dt", "atom_id"]: if name in entry and (entry[name]=="" or entry[name]=="None"): del entry[name] # project_id = models.CharField(null=True, blank=True, max_length=61) # commission_id = models.BigIntegerField(null=False, blank=False, db_index=True) # pluto_core_project_id = models.BigIntegerField(null=False, blank=False, db_index=True, unique=True) # name = models.CharField(null=False, blank=False, unique=True, max_length=255) # created = models.DateTimeField(null=False, blank=False, auto_now_add=True) parent_deliverable_bundle, parent_bundle_created = self.get_parent_bundle(parent_project_id, commissions_table) if parent_bundle_created: logger.info("created new parent bundle for project id {}".format(parent_project_id)) parent_deliverable_bundle.save() else: logger.info("using existing bundle {}".format(parent_deliverable_bundle.id)) if "gnm_website_master" in entry and entry["gnm_website_master"] is not None: if entry["gnm_website_master"].get("media_atom_id")=="" or entry["gnm_website_master"].get("media_atom_id")=="None": del entry["gnm_website_master"]["media_atom_id"] for name in ["publication_date", "etag"]: if entry["gnm_website_master"].get(name) == "": del entry["gnm_website_master"][name] entry["gnm_website_master"] = GNMWebsite(**entry["gnm_website_master"]) entry["gnm_website_master"].save() else: entry["gnm_website_master"] = None if "youtube_master" in entry and entry["youtube_master"] is not None: for name in ["publication_date", "etag"]: if entry["youtube_master"].get(name) == "": del entry["youtube_master"][name] if "youtube_categories" in entry["youtube_master"]: entry["youtube_master"]["youtube_categories"] = Command.numbers_only_list(entry["youtube_master"]["youtube_categories"]) entry["youtube_master"] = Youtube(**entry["youtube_master"]) entry["youtube_master"].save() else: entry["youtube_master"] = None if "mainstream_master" in entry and entry["mainstream_master"] is not None: for name in ["publication_date", "etag"]: if entry["mainstream_master"].get(name) == "": del entry["mainstream_master"][name] entry["mainstream_master"] = Mainstream(**entry["mainstream_master"]) entry["mainstream_master"].save() else: entry["mainstream_master"] = None if "DailyMotion_master" in entry and entry["DailyMotion_master"] is not None: for name in ["publication_date", "etag"]: if entry["DailyMotion_master"].get(name) == "": del entry["DailyMotion_master"][name] entry["DailyMotion_master"] = DailyMotion(**entry["DailyMotion_master"]) entry["DailyMotion_master"].save() else: entry["DailyMotion_master"] = None newrec = DeliverableAsset(**entry) legacy_item_id = newrec.online_item_id #check for existing items existing_item_check = DeliverableAsset.objects.filter(absolute_path=newrec.absolute_path, size=newrec.size,filename=newrec.filename).count() if existing_item_check>0: logger.info("Legacy item {} already exists with {} copies, not duplicating".format(legacy_item_id, existing_item_check)) continue newrec.online_item_id = None newrec.deliverable = parent_deliverable_bundle newrec.save() logger.info("Legacy item {} imported to {}".format(legacy_item_id, newrec.pk))