alibabacloud-gateway-fc/util/python/alibabacloud_gateway_fc_util/client.py (212 lines of code) (raw):

# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import base64 import email.utils import hashlib import hmac import logging import urllib.parse from typing import ( Dict, ) import requests from alibabacloud_credentials.client import Client as CredentialClient from requests import ( Response, Request, ) # HTTPHeaderContentMD5 key in request headers HTTPHeaderContentMD5 = "content-md5" # HTTPHeaderPrefix key prefix in request headers HTTPHeaderPrefix = "x-fc-" # HTTPHeaderDate key in request headers HTTPHeaderDate = "date" # AuthQueryKeyExpires key in request headers AuthQueryKeyExpires = "x-fc-expires" # HTTPHeaderContentType key in request headers HTTPHeaderContentType = "content-type" # HTTPHeaderAuthorization key in request headers HTTPHeaderAuthorization = "authorization" # HTTPHeaderSecurityToken key in request headers HTTPHeaderSecurityToken = "x-fc-security-token" class Client: _credential: CredentialClient = None def __init__( self, cred: CredentialClient, ): self._credential = cred def invoke_httptrigger( self, url: str, method: str, body: bytes, headers: Dict[str, str], ) -> Response: req = self.build_httprequest(url, method, body, headers) return self.send_httprequest_with_authorization(req) async def invoke_httptrigger_async( self, url: str, method: str, body: bytes, headers: Dict[str, str], ) -> Response: req = await self.build_httprequest_async(url, method, body, headers) return await self.send_httprequest_with_authorization_async(req) def invoke_anonymous_httptrigger( self, url: str, method: str, body: bytes, headers: Dict[str, str], ) -> Response: req = self.build_httprequest(url, method, body, headers) return self.send_httprequest(req) async def invoke_anonymous_httptrigger_async( self, url: str, method: str, body: bytes, headers: Dict[str, str], ) -> Response: req = await self.build_httprequest_async(url, method, body, headers) return await self.send_httprequest_async(req) def send_httprequest_with_authorization( self, req: Request, ) -> Response: signedRequest = self.sign_request(req) return self.send_httprequest(signedRequest) async def send_httprequest_with_authorization_async( self, req: Request, ) -> Response: signedRequest = await self.sign_request_async(req) return await self.send_httprequest_async(signedRequest) def send_httprequest( self, req: Request, ) -> Response: with requests.Session() as s: p = s.prepare_request(req) return s.send(p) async def send_httprequest_async( self, req: Request, ) -> Response: with requests.Session() as s: p = s.prepare_request(req) return s.send(p) def sign_request( self, req: Request, ) -> Request: # FIXME Request.data is too flexible, and we can't calculate the md5 value of it properly. return self.sign_request_with_content_md5(req, '') async def sign_request_async( self, req: Request, ) -> Request: # FIXME Request.data is too flexible, and we can't calculate the md5 value of it properly. return await self.sign_request_with_content_md5_async(req, '') def sign_request_with_content_md5( self, req: Request, content_md5: str, ) -> Request: security_token = self._credential.get_security_token() if security_token: req.headers[HTTPHeaderSecurityToken] = security_token if content_md5: req.headers[HTTPHeaderContentMD5] = content_md5 req.headers[HTTPHeaderDate] = email.utils.formatdate(usegmt=True) result = urllib.parse.urlparse(req.url) auth_string = self.auth_string(req.method, urllib.parse.unquote(result.path), req.headers, urllib.parse.unquote(result.query)) req.headers[HTTPHeaderAuthorization] = auth_string return req async def sign_request_with_content_md5_async( self, req: Request, content_md5: str, ) -> Request: security_token = self._credential.get_security_token() if security_token: req.headers[HTTPHeaderSecurityToken] = security_token if content_md5: req.headers[HTTPHeaderContentMD5] = content_md5 req.headers[HTTPHeaderDate] = email.utils.formatdate(usegmt=True) result = urllib.parse.urlparse(req.url) auth_string = self.auth_string(req.method, urllib.parse.unquote(result.path), req.headers, urllib.parse.unquote(result.query)) req.headers[HTTPHeaderAuthorization] = auth_string return req def auth_string(self, method: str, unescaped_path: str, headers: Dict[str, str], unescaped_queries: str): """ Sign the request. See the spec for reference. https://help.aliyun.com/document_detail/52877.html :param unescaped_queries: query of http request :param method: method of the http request. :param headers: headers of the http request. :param unescaped_path: unescaped path without queries of the http request. :return: the signature string. """ content_md5 = headers.get('content-md5', '') content_type = headers.get('content-type', '') date = headers.get('date', '') canonical_headers = Client.build_canonical_headers(headers) if not unescaped_path: unescaped_path = '/' canonical_resource = unescaped_path + '\n' access_key_id = self._credential.get_access_key_id() access_key_secret = self._credential.get_access_key_secret() if unescaped_queries: canonical_resource += '\n'.join(unescaped_queries.split('&')) string_to_sign = '\n'.join( [method.upper(), content_md5, content_type, date, canonical_headers + canonical_resource]) logging.debug('string to sign:{0}'.format(string_to_sign)) h = hmac.new(access_key_secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256) signature = 'FC ' + access_key_id + ':' + \ base64.b64encode(h.digest()).decode('utf-8') return signature @staticmethod def build_canonical_headers(headers): """ :param headers: :class:`Request` object :return: Canonicalized header string. :rtype: String """ canonical_headers = [] for k, v in headers.items(): lower_key = k.lower() if lower_key.startswith('x-fc-'): canonical_headers.append((lower_key, v)) canonical_headers.sort(key=lambda x: x[0]) if canonical_headers: return '\n'.join(k + ':' + v for k, v in canonical_headers) + '\n' else: return '' @staticmethod def get_sign_resource(unescaped_path, unescaped_queries): if not isinstance(unescaped_queries, Dict): raise TypeError("`Dict` type required for queries") params = [] for key, values in unescaped_queries.items(): if isinstance(values, str): params.append('%s=%s' % (key, values)) continue if len(values) > 0: for value in values: params.append('%s=%s' % (key, value)) else: params.append('%s' % key) params.sort() resource = unescaped_path + '\n' + '\n'.join(params) return resource def build_httprequest( self, url: str, method: str, body: bytes, headers: Dict[str, str], ) -> Request: return Request( url=url, method=method, headers=headers, data=body, ) async def build_httprequest_async( self, url: str, method: str, body: bytes, headers: Dict[str, str], ) -> Request: return Request( url=url, method=method, headers=headers, data=body, )