def authenticate()

in sample/topic/server.py [0:0]


    def authenticate(self):
        # get string to signature
        service_str = "\n".join(sorted(["%s:%s" % (k, v) for k, v in self.headers.items() if k.startswith("x-mns-")]))
        sign_header_list = []
        for key in ["content-md5", "content-type", "date"]:
            if key in self.headers.keys():
                sign_header_list.append(self.headers[key])
            else:
                sign_header_list.append("")
        str2sign = u"%s\n%s\n%s\n%s" % (self.command, "\n".join(sign_header_list), service_str, self.path)
        print(str2sign)
        # verify
        # authorization = self.headers.getheader('Authorization')
        authorization = self.headers['Authorization']
        signature = base64.b64decode(authorization)
        cert = urlopen(base64.b64decode(self.headers['x-mns-signing-cert-url']).decode('utf-8')).read()
        cert_str = cert.decode('utf-8')

        from Cryptodome.Util.asn1 import DerSequence
        from Cryptodome.PublicKey import RSA
        from Cryptodome.Signature import PKCS1_v1_5
        from Cryptodome.Hash import SHA
        from binascii import a2b_base64
        # Convert from PEM to DER
        lines = cert_str.replace(" ", '').split()
        der = a2b_base64(''.join(lines[1:-1]))

        # Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280)
        cert = DerSequence()
        cert.decode(der)
        tbs_certificate = DerSequence()
        tbs_certificate.decode(cert[0])
        subject_public_key_info = tbs_certificate[6]

        # Initialize RSA key
        key = RSA.importKey(subject_public_key_info)
        h = SHA.new(str2sign.encode('utf-8'))
        verifier = PKCS1_v1_5.new(key)
        if verifier.verify(h, signature):
            return True
        return False