in oss2/auth.py [0:0]
def _sign_rtmp_url(self, url, bucket_name, channel_name, expires, params):
credentials = self.credentials_provider.get_credentials()
if credentials.get_security_token():
params['security-token'] = credentials.get_security_token()
expiration_time = int(time.time()) + expires
canonicalized_resource = "/%s/%s" % (bucket_name, channel_name)
canonicalized_params = []
if params:
items = params.items()
for k, v in items:
if k != "OSSAccessKeyId" and k != "Signature" and k != "Expires" and k != "SecurityToken":
canonicalized_params.append((k, v))
canonicalized_params.sort(key=lambda e: e[0])
canon_params_str = ''
for k, v in canonicalized_params:
canon_params_str += '%s:%s\n' % (k, v)
p = params if params else {}
string_to_sign = str(expiration_time) + "\n" + canon_params_str + canonicalized_resource
logger.debug('Sign Rtmp url: string to be signed = {0}'.format(string_to_sign))
h = hmac.new(to_bytes(credentials.get_access_key_secret()), to_bytes(string_to_sign), hashlib.sha1)
signature = utils.b64encode_as_string(h.digest())
p['OSSAccessKeyId'] = credentials.get_access_key_id()
p['Expires'] = str(expiration_time)
p['Signature'] = signature
return url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in p.items())