rabbitmq/management/commands/resync_cached_commissions.py (80 lines of code) (raw):
import requests
from django.core.management.base import BaseCommand
import hashlib
import copy
from email.utils import formatdate
import hmac
import logging
import time
import sys
from rabbitmq.models import CachedCommission
from rabbitmq.serializers import CachedCommissionSerializer
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "directly re-synchronise our cache of commissions data with pluto-core"
def add_arguments(self, parser):
parser.add_argument("--server",type=str,help="URL location of the pluto-core instance to sync with")
parser.add_argument("--secret", type=str, help="shared secret for auth with pluto-core")
@staticmethod
def sign_request(original_headers:dict, method:str, path:str, content_type:str, content_body:str, shared_secret:str) -> dict:
"""
returns a dictionary including a suitable authorization header
:param original_headers: original content headers
:param content_body: data that is being sent
:return: new headers dictionary
"""
new_headers = copy.deepcopy(original_headers)
content_hasher = hashlib.sha384()
content_hasher.update(content_body.encode("UTF-8"))
nowdate = formatdate(usegmt=True)
checksumstring = content_hasher.hexdigest()
new_headers['Digest'] = "SHA-384=" + checksumstring
new_headers['Content-Length'] = str(len(content_body))
new_headers['Content-Type'] = content_type
new_headers['Date'] = nowdate
string_to_sign = """{path}\n{date}\n{content_type}\n{checksum}\n{method}""".format(
date=nowdate,content_type=content_type,checksum=checksumstring,
method=method,path=path
)
print("debug: string to sign: {0}".format(string_to_sign))
hmaccer = hmac.new(shared_secret.encode("UTF-8"), string_to_sign.encode("UTF-8"), hashlib.sha384)
result = hmaccer.hexdigest()
print("debug: final digest is {0}".format(result))
new_headers['Authorization'] = "HMAC {0}".format(result)
return new_headers
def next_page_of_commissions(self, url:str, shared_secret:str, start_at:int, length:int):
"""
generator that yields a commission dictionary
:param url:
:param shared_secret:
:param start_at:
:param length:
:return:
"""
urlpath = "/api/pluto/commission?startAt={}&length={}".format(start_at, length)
signed_headers = Command.sign_request({}, "GET", urlpath, "application/octet-stream", "", shared_secret)
response = requests.get(url + urlpath, headers=signed_headers, verify=False)
if response.status_code==200:
content = response.json()
for entry in content["result"]:
yield entry
elif response.status_code==404:
raise Exception("Endpoint not found? This indicates a code bug")
elif response.status_code==503 or response.status_code==502:
logger.warning("pluto-core not responding. Trying again in a few seconds...")
time.sleep(3)
for entry in self.next_page_of_commissions(url, shared_secret, start_at, length):
yield entry
else:
logger.error("Server error {0}".format(response.status_code))
logger.error(response.text)
raise Exception("Server error")
def process_entry(self, entry:dict):
"""
handle a raw data structure from pluto-core
:param entry:
:return:
"""
serializer = CachedCommissionSerializer(data=entry)
if not serializer.is_valid():
logger.warning("Data for {} was not valid".format(entry))
return
commission = CachedCommission(**serializer.validated_data)
logger.debug("updating cachedcommission {}".format(commission.title))
commission.save()
def handle(self, *args, **options):
if not options["server"] or not options["secret"]:
logger.error("You must specify --server and --secret on the command line")
sys.exit(1)
page_size = 100
start_at = 0
while True:
processed = 0
for entry in self.next_page_of_commissions(options["server"], options["secret"], start_at, page_size):
self.process_entry(entry)
start_at += 1
processed += 1
if processed==0:
logger.info("All done - processed {} commissions".format(start_at))
break