in atomresponder/master_importer.py [0:0]
def import_new_item(self, master_item:VSItem, content):
from .models import ImportJob, PacFormXml
from .pac_xml import PacXmlProcessor
from mock import MagicMock
if not isinstance(master_item, VSItem) and not isinstance(master_item, MagicMock): raise TypeError #for intellij
from kinesisresponder.sentry import inform_sentry
vs_item_id = master_item.get("itemId")
if vs_item_id is None:
vs_item_id = master_item.name
old_finished_jobs = self.check_for_old_finished_jobs(vs_item_id)
old_key = self.check_key(content['s3Key'], vs_item_id)
if old_finished_jobs is True and old_key is True:
logger.info('A job for item {0} has already been successfully completed. Aborting.'.format(vs_item_id))
inform_sentry('A job for item {0} has already been successfully completed. Aborting.'.format(vs_item_id), {
"master_item": master_item,
"content": content,
})
return
processing_job = self.check_for_processing(vs_item_id)
if processing_job is True:
logger.info('Job for item {0} already in progress. Aborting.'.format(vs_item_id))
inform_sentry('Job for item {0} already in progress. Aborting.'.format(vs_item_id), {
"master_item": master_item,
"content": content,
})
return
safe_title = content.get('title','(unknown title)').encode("UTF-8","backslashescape").decode("UTF-8")
#using a signed URL is preferred, but right now VS seems to have trouble ingesting it.
#so, we download instead and ingest that. get_s3_signed_url is left in to make it simple to switch back
#download_url = self.get_s3_signed_url(bucket=settings.ATOM_RESPONDER_DOWNLOAD_BUCKET, key=content['s3Key'])
downloaded_path = self.download_to_local_location(bucket=settings.ATOM_RESPONDER_DOWNLOAD_BUCKET,
key=content['s3Key'],
#this is converted to a safe filename within download_to_local_location
filename=content.get('title', None)) #filename=None => use s3key instead
download_url = "file://" + urllib.parse.quote(downloaded_path)
logger.info("{n}: Ingesting atom with title '{0}' from media atom with ID {1}".format(safe_title,
content['atomId'],
n=master_item.name))
logger.info("{n}: Download URL is {0}".format(download_url, n=master_item.name))
job_result = master_item.import_to_shape(uri=download_url,
essence=True,
shape_tag=getattr(settings,"ATOM_RESPONDER_SHAPE_TAG","lowres"),
priority=getattr(settings,"ATOM_RESPONDER_IMPORT_PRIORITY","HIGH"),
jobMetadata={'gnm_source': 'media_atom'},
)
logger.info("{0} Import job is at ID {1}".format(vs_item_id, job_result.name))
#make a note of the record. This is to link it up with Vidispine's response message.
record = ImportJob(item_id=vs_item_id,
job_id=job_result.name,
status='STARTED',
started_at=datetime.now(),
user_email=content.get('user',"Unknown user"),
atom_id=content['atomId'],
atom_title=content.get('title', "Unknown title"),
s3_path=content['s3Key'],
processing=True)
previous_attempt = record.previous_attempt()
if previous_attempt:
record.retry_number = previous_attempt.retry_number+1
logger.info("{0} Import job is retry number {1}".format(vs_item_id, record.retry_number))
record.save()
statinfo = os.stat(downloaded_path)
self.update_pluto_record(vs_item_id, job_result.name, content, statinfo)
try:
logger.info("{n}: Looking for PAC info that has been already registered".format(n=vs_item_id))
pac_entry = PacFormXml.objects.get(atom_id=content['atomId'])
logger.info("{n}: Found PAC form information at {0}".format(pac_entry.pacdata_url,n=vs_item_id))
proc = PacXmlProcessor(self.role_name, self.session_name)
proc.link_to_item(pac_entry, master_item)
except PacFormXml.DoesNotExist:
logger.info("{n}: No PAC form information has yet arrived".format(n=vs_item_id))