testscripts/request-move-file.py (59 lines of code) (raw):
#!/usr/bin/env python3
import hashlib
import hmac
import urllib.parse
from optparse import OptionParser
from datetime import datetime, timedelta
import base64
from email.utils import formatdate
import requests
from time import mktime
from urllib.parse import urlparse
from pprint import pprint
import json
#uncomment this block to get request-level debug output
# import logging
#
# try:
# import http.client as http_client
# except ImportError:
# # Python 2
# import httplib as http_client
# http_client.HTTPConnection.debuglevel = 1
#
# # You must initialize logging, otherwise you'll not see debug output.
# logging.basicConfig()
# logging.getLogger().setLevel(logging.DEBUG)
# requests_log = logging.getLogger("requests.packages.urllib3")
# requests_log.setLevel(logging.DEBUG)
# requests_log.propagate = True
def checksum(content:bytes) -> str:
"""
Calculates the SHA-384 checksum of the given content and returns the base64 encoded representation as a string
:param content: the content to hash
:return: a string representing the checksum
"""
digest = hashlib.sha384(content).digest()
return base64.b64encode(digest).decode("UTF-8")
def get_token(uri:str, secret:str, method:str, content:bytes, checksum:str) -> (str, str):
"""
Generates an HMAC token
:param uri: URI that is going to be accessed
:param secret: Shared secret with the server
:param method: HTTP method for the request
:param content: byte string of the request body. Can be empty.
:param checksum: SHA-384 checksum of the body content. If content is empty then this should be the SHA checksum of an empty string
:return: tuple consisting of the body of an "Authorization" header and the HTTP style date/time of the request.
"""
t = datetime.now()
httpdate = formatdate(timeval=mktime(t.timetuple()),localtime=False,usegmt=True)
url_parts = urlparse(uri)
content_length = len(content)
string_to_sign = "{}\n{}\n{}\n{}\n{}".format(httpdate, content_length, checksum, method, url_parts.path + "?" + url_parts.query)
print("string_to_sign: " + string_to_sign)
hm = hmac.digest(secret.encode("UTF-8"), msg=string_to_sign.encode("UTF-8"), digest=hashlib.sha384)
return "HMAC {0}".format(base64.b64encode(hm).decode("UTF-8")), httpdate
# START MAIN
parser = OptionParser()
parser.add_option("--host", dest="host", help="host to access", default="archivehunter.local.dev-gutools.co.uk")
parser.add_option("--no-verify", dest="sslnoverify", action="store_true", default="false", help="set this to disable SSL cert checking")
parser.add_option("-s", "--secret", dest="secret", help="shared secret to use")
parser.add_option("--id", dest="entry_id", help="ArchiveHunter ID of media to set proxy for")
parser.add_option("--dest", dest="dest", help="Collection (bucket) name to move the content to")
(options, args) = parser.parse_args()
if options.secret is None:
print("You must supply the password in --secret")
exit(1)
method = "PUT"
uri = "https://{host}/api/move/{fileid}?to={collection}".format(host=options.host,
fileid=urllib.parse.quote(options.entry_id).replace("/", "%2F"),
collection=urllib.parse.quote(options.dest))
print("uri is " + uri)
content_body = "".encode("UTF-8")
content_hash = checksum(content_body)
authtoken, httpdate = get_token(uri, options.secret, method, content_body, content_hash)
print(authtoken)
headers = {
'Date': httpdate,
'Authorization': authtoken,
'X-Sha384-Checksum': content_hash,
}
print(headers)
extra_kwargs = {}
if options.sslnoverify:
extra_kwargs['verify'] = False
response = requests.put(uri, headers=headers, **extra_kwargs)
print("Server returned {0}".format(response.status_code))
pprint(response.headers)
if response.status_code==200:
pprint(response.json())
else:
print(response.text)