atomresponder/tasks.py (1 lines of code) (raw):
# import celery
# from celery.decorators import periodic_task
# from celery import shared_task
# from celery.schedules import crontab
# from datetime import datetime, timedelta
# import logging
# from django.db.models import Q
# from django.conf import settings
# import traceback
#
# logger = logging.getLogger(__name__)
#
#
# @periodic_task(run_every=crontab(minute=23,hour=23))
# def cleanup_old_importjobs():
# """
# Scheduled task to remove all finished entries older than 30 days from the database
# :return:
# """
# from .models import ImportJob
# from portal.plugins.kinesisresponder.sentry import inform_sentry_exception
# try:
# qs = ImportJob.objects\
# .filter(Q(status='FINISHED') | Q(status='FINISHED_WARNING'))\
# .filter(completed_at__lte=datetime.now()-timedelta(days=30))
#
# logger.info("Cleaning out {0} import jobs".format(qs.count()))
# qs.delete()
# logger.info("Done")
# except Exception:
# logger.error(traceback.format_exc())
# inform_sentry_exception()
#
#
# def delete_from_s3(conn, record):
# """
# Deletes the raw media from the specified ImportJob record from S3
# :param conn: Boto s3 connection object
# :param record: ImportJob record
# :return: Boolean
# """
# from .models import ImportJob
# from portal.plugins.kinesisresponder.sentry import inform_sentry_exception
#
# if not isinstance(record, ImportJob): raise TypeError
# if record.s3_path is None:
# return True
#
# bucket = conn.get_bucket(settings.ATOM_RESPONDER_DOWNLOAD_BUCKET)
# try:
# bucket.delete_key(record.s3_path)
# record.s3_path = None
# record.save()
# return True
# except Exception as e:
# #don't break the loop if it fails
# logger.error("Unable to delete {0} from record {1}: {2}".format(record.s3_path, str(record), str(e)))
# logger.error(traceback.format_exc())
# inform_sentry_exception({
# "record": record.__dict__
# })
# return False
#
#
# @periodic_task(run_every=crontab(minute=31))
# def cleanup_s3_files():
# """
# Scheduled task to delete the original media for successfully imported jobs
# :return:
# """
# from .models import ImportJob
# from .master_importer import S3Mixin
# s3_connector = S3Mixin(settings.ATOM_RESPONDER_ROLE_NAME, "AutoDeleteSession")
#
# qs = ImportJob.objects\
# .filter(Q(status='FINISHED') | Q(status='FINISHED_WARNING'))\
# .filter(completed_at__lte=datetime.now()-timedelta(days=1))
#
# conn = s3_connector.get_s3_connection()
# logger.info("Removing {0} job files from s3 bucket".format(qs.count()))
# results = [delete_from_s3(conn, record) for record in qs]
#
# succeeded = len([result for result in results if result])
# failed = len([result for result in results if not result])
# logger.info("Cleanup completed, removed {0} files, {1} failed".format(succeeded, failed))
#
#
# @periodic_task(run_every=crontab(minute=45))
# def check_unprocessed_pacxml():
# """
# Scheduled task to check if any unprocessed pac forms have "fallen through the cracks"
# :return:
# """
# from .models import PacFormXml
# from .pac_xml import PacXmlProcessor
# from django.conf import settings
# from .vs_mixin import VSMixin
#
# role_name = settings.ATOM_RESPONDER_ROLE_NAME
# session_name = "GNMAtomResponderTimed"
#
# vs = VSMixin()
# proc = PacXmlProcessor(role_name,session_name)
#
# queryset = PacFormXml.objects.filter(status="UNPROCESSED")
#
# logger.info("check_unprocessed_pacxml: Found {0} unprocessed records".format(queryset.count()))
#
# for pac_xml_record in queryset:
# vsitem = vs.get_item_for_atomid(pac_xml_record.atom_id)
# #this process will call out to Pluto to do the linkup once the data has been received
# if vsitem is not None:
# logger.info("check_unprocessed_pacxml: Found item {0} for atom {1}".format(vsitem.name, pac_xml_record.atom_id))
# proc.link_to_item(pac_xml_record, vsitem)
# logger.info("check_unprocessed_pacxml: linkup initiated for {0} from {1}".format(vsitem.name, pac_xml_record.atom_id))
# else:
# logger.info("check_unprocessed_pacxml: No items found for atom {0}".format(pac_xml_record.atom_id))
#
# logger.info("check_unprocessed_pacxml: run completed")
#
#
# def delete_s3_url(conn, s3_url):
# from urllib.parse import urlparse
#
# parsed = urlparse(s3_url)
# if parsed.scheme != "s3":
# raise ValueError("delete_s3_url called on something not an s3 url (was {0})".format(parsed.scheme))
#
# bucket = conn.get_bucket(parsed.hostname)
# if parsed.path.startswith('/'):
# path = parsed.path[1:]
# else:
# path = parsed.path
#
# bucket.delete_key(path)
#
#
# @periodic_task(run_every=crontab(minute=55))
# def expire_processed_pacrecords():
# """
# Scheduled task to remove pac xml records for items that have been processed
# :return:
# """
#
# from .models import PacFormXml
# from django.conf import settings
# from .master_importer import S3Mixin
#
# queryset = PacFormXml.objects.filter(status="PROCESSED",completed__lte=datetime.now()-timedelta(days=1))
#
# logger.info("expire_processed_pacrecords: found {0} records processed and older than 1 day to purge".format(queryset.count()))
# s3_connector = S3Mixin(settings.ATOM_RESPONDER_ROLE_NAME, "AutoDeletePacSession")
#
# conn = s3_connector.get_s3_connection()
#
# for pac_record in queryset:
# logger.info("expire_processed_pacrecords: purging record for atom {0}".format(pac_record.atom_id))
# delete_s3_url(conn, pac_record.pacdata_url)
# pac_record.delete()
# logger.info("expire_processed_pacrecords completed")
#
#
# @shared_task
# def timed_retry_process_message(record, approx_arrival, attempt=0):
# """
# task that can be used to retry an ingest if messages have arrived before content is available
# :return:
# """
# from .master_importer import MasterImportResponder
# from .management.commands.run_atom_responder import Command as AtomResponderCommand
# import json
#
# content = json.loads(record)
# logger.info("{0}: starting timed retry".format(content['atomId']))
# imp = MasterImportResponder(AtomResponderCommand.role_name, AtomResponderCommand.session_name,
# AtomResponderCommand.stream_name, "timed-resync",
# aws_access_key_id=settings.MEDIA_ATOM_AWS_ACCESS_KEY_ID,
# aws_secret_access_key=settings.MEDIA_ATOM_AWS_SECRET_ACCESS_KEY)
#
# imp.process(record, approx_arrival, attempt=attempt)
# logger.info("{0}: timed retry completed".format(content['atomId']))