in sample/topic/server.py [0:0]
def authenticate(self):
# get string to signature
service_str = "\n".join(sorted(["%s:%s" % (k, v) for k, v in self.headers.items() if k.startswith("x-mns-")]))
sign_header_list = []
for key in ["content-md5", "content-type", "date"]:
if key in self.headers.keys():
sign_header_list.append(self.headers[key])
else:
sign_header_list.append("")
str2sign = u"%s\n%s\n%s\n%s" % (self.command, "\n".join(sign_header_list), service_str, self.path)
print(str2sign)
# verify
# authorization = self.headers.getheader('Authorization')
authorization = self.headers['Authorization']
signature = base64.b64decode(authorization)
cert = urlopen(base64.b64decode(self.headers['x-mns-signing-cert-url']).decode('utf-8')).read()
cert_str = cert.decode('utf-8')
from Cryptodome.Util.asn1 import DerSequence
from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import PKCS1_v1_5
from Cryptodome.Hash import SHA
from binascii import a2b_base64
# Convert from PEM to DER
lines = cert_str.replace(" ", '').split()
der = a2b_base64(''.join(lines[1:-1]))
# Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280)
cert = DerSequence()
cert.decode(der)
tbs_certificate = DerSequence()
tbs_certificate.decode(cert[0])
subject_public_key_info = tbs_certificate[6]
# Initialize RSA key
key = RSA.importKey(subject_public_key_info)
h = SHA.new(str2sign.encode('utf-8'))
verifier = PKCS1_v1_5.new(key)
if verifier.verify(h, signature):
return True
return False