def _sign_rtmp_url()

in oss2/auth.py [0:0]


    def _sign_rtmp_url(self, url, bucket_name, channel_name, expires, params):
        credentials = self.credentials_provider.get_credentials()
        if credentials.get_security_token():
            params['security-token'] = credentials.get_security_token()

        expiration_time = int(time.time()) + expires

        canonicalized_resource = "/%s/%s" % (bucket_name, channel_name)
        canonicalized_params = []

        if params:
            items = params.items()
            for k, v in items:
                if k != "OSSAccessKeyId" and k != "Signature" and k != "Expires" and k != "SecurityToken":
                    canonicalized_params.append((k, v))

        canonicalized_params.sort(key=lambda e: e[0])
        canon_params_str = ''
        for k, v in canonicalized_params:
            canon_params_str += '%s:%s\n' % (k, v)

        p = params if params else {}
        string_to_sign = str(expiration_time) + "\n" + canon_params_str + canonicalized_resource
        logger.debug('Sign Rtmp url: string to be signed = {0}'.format(string_to_sign))


        h = hmac.new(to_bytes(credentials.get_access_key_secret()), to_bytes(string_to_sign), hashlib.sha1)
        signature = utils.b64encode_as_string(h.digest())

        p['OSSAccessKeyId'] = credentials.get_access_key_id()
        p['Expires'] = str(expiration_time)
        p['Signature'] = signature

        return url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in p.items())