gnm_deliverables/management/commands/validate_archive.py (98 lines of code) (raw):
from django.core.management.base import BaseCommand
import csv
from gnm_deliverables.models import DeliverableAsset
import logging
import hashlib
import hmac
from optparse import OptionParser
from datetime import datetime
import base64
from email.utils import formatdate
import requests
from time import mktime, sleep
from urllib.parse import urlparse
from pprint import pprint
import os.path
import requests
import sys
import logging
logging.basicConfig(level=logging.DEBUG)
def get_token(uri:str, secret:str, time:datetime) -> (str, str):
"""
create signing token for archivehunter
:param uri:
:param secret:
:return:
"""
httpdate = formatdate(timeval=mktime(time.timetuple()),localtime=False,usegmt=True)
url_parts = urlparse(uri)
string_to_sign = "{0}\n{1}".format(httpdate, url_parts.path)
print("string_to_sign: " + string_to_sign)
hm = hmac.new(secret.encode("UTF-8"), string_to_sign.encode("UTF-8"),hashlib.sha256)
return "HMAC {0}".format(base64.b64encode(hm.digest()).decode("UTF-8")), httpdate
class NotFoundResponse(Exception):
pass
class ForbiddenResponse(Exception):
pass
class ServerErrorResponse(Exception):
pass
def authenticated_request(uri:str, secret:str, verify=True, override_time:datetime=None) -> dict:
if override_time is not None:
timestamp = override_time
else:
timestamp = datetime.now()
authtoken, httpdate = get_token(uri, secret, timestamp)
headers = {
'X-Gu-Tools-HMAC-Date': httpdate,
'X-Gu-Tools-HMAC-Token': authtoken,
}
response = requests.get(uri, headers=headers, verify=verify)
if response.status_code==200:
return response.json()
elif response.status_code==404:
raise NotFoundResponse
elif response.status_code==403 or response.status_code==401:
logger.error("Server returned forbidden. Server said: {}".format(response.text))
raise ForbiddenResponse
elif response.status_code==500:
logger.error("Server error looking up. Server said: {}".format(response.text))
raise ServerErrorResponse
elif response.status_code==503 or response.status_code==504:
logger.warning("Server not available, retrying in 3s")
sleep(3)
return authenticated_request(uri, secret, verify=verify, override_time=override_time)
else:
raise Exception("Unexpected server response: {} {}".format(response.status_code, response.text))
logger = logging.getLogger(__name__)
class Command(BaseCommand):
"""
management command to validate that items apparently in archive are actually there
"""
help = 'Verify that the registered archive ID for deliverables is accurate'
def add_arguments(self, parser):
parser.add_argument("--output", type=str, default="report.csv", help="location to output a CSV report")
parser.add_argument("--server", type=str, help="base URL to Archive Hunter")
parser.add_argument("--secret", type=str, help="shared secret for authentication")
parser.add_argument("--insecure-no-verify", type=bool, default=False, help="don't verify SSL certs. Not recommended.")
queryset = DeliverableAsset.objects.exclude(archive_item_id__isnull=True).exclude(archive_item_id__exact="")
def handle(self, *args, **options):
pprint(options)
output_file_path = options["output"]
if not options["server"]:
print("You must specify --server on the commandline")
sys.exit(1)
if not options["secret"]:
print("You must specify --secret on the commandline")
sys.exit(1)
total_count = DeliverableAsset.objects.all().count()
archived_count = self.queryset.count()
try:
authenticated_request(os.path.join(options["server"],"api/entry","not-exist"), options["secret"])
except NotFoundResponse:
pass
with open(output_file_path, "w") as f:
writer = csv.writer(f, dialect=csv.excel)
writer.writerow(["Asset ID","Bundle ID", "Filename","Bundle name","Archive Id","Found"])
logger.info("Out of {} items registered, {} are in the archive".format(total_count, archived_count))
for asset in self.queryset:
if asset.archive_item_id is None:
logger.warning("Item {} has no archive id - should not have been included in search??".format(asset.filename))
try:
url = os.path.join(options["server"],"api/entry",asset.archive_item_id)
logger.debug("url is {0}".format(url))
authenticated_request(url, options["secret"])
logger.info("Found archived entry for {}".format(asset.filename))
except NotFoundResponse:
logger.info("No archived entry found for {}".format(asset.filename))
writer.writerow([asset.id, asset.deliverable_id, asset.filename, asset.deliverable.name, asset.archive_item_id, False])