azext_iot/common/sas_token_auth.py (43 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
sas_token_auth: Module containing Shared Access Signature token class.
"""
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
from time import time
try:
from urllib import (urlencode, quote_plus)
except ImportError:
from urllib.parse import (urlencode, quote_plus)
from msrest.authentication import Authentication
class SasTokenAuthentication(Authentication):
"""
Shared Access Signature authorization for Azure IoT Hub.
Args:
uri (str): Uri of target resource.
shared_access_policy_name (str): Name of shared access policy.
shared_access_key (str): Shared access key.
expiry (int): Future expiry (in seconds) of the token to be generated.
"""
def __init__(self, uri, shared_access_policy_name, shared_access_key, expiry=600):
self.uri = uri
self.policy = shared_access_policy_name
self.key = shared_access_key
self.expiry = int(expiry)
def signed_session(self, session=None):
"""
Create requests session with SAS auth headers.
If a session object is provided, configure it directly. Otherwise,
create a new session and return it.
Returns:
session (): requests.Session.
"""
return self.refresh_session(session)
def refresh_session(self, session=None):
"""
Refresh requests session with SAS auth headers.
If a session object is provided, configure it directly. Otherwise,
create a new session and return it.
Returns:
session (): requests.Session.
"""
session = session or super(SasTokenAuthentication, self).signed_session()
session.headers['Authorization'] = self.generate_sas_token()
return session
def generate_sas_token(self, absolute=False):
"""
Create a shared access signature token as a string literal.
Args:
absolute (bool): In general the sas token ttl is generated relative to 'now' (UTC) + expiry.
Set to true to generate a sas token with no relative start.
Returns:
result (str): SAS token as string literal.
"""
encoded_uri = quote_plus(self.uri)
ttl = int(self.expiry) if absolute else int(time() + self.expiry)
sign_key = '%s\n%d' % (encoded_uri, ttl)
signature = b64encode(HMAC(b64decode(self.key), sign_key.encode('utf-8'), sha256).digest())
result = {
'sr': self.uri,
'sig': signature,
'se': str(ttl)
}
if self.policy:
result['skn'] = self.policy
return 'SharedAccessSignature ' + urlencode(result)
class BasicSasTokenAuthentication(Authentication):
"""
Basic Shared Access Signature authorization for Azure IoT Hub.
Args:
sas_token (str): sas token to use in authentication.
"""
def __init__(self, sas_token):
self.sas_token = sas_token
def signed_session(self):
"""
Create requests session with SAS auth headers.
Returns:
session (): requests.Session.
"""
session = super(BasicSasTokenAuthentication, self).signed_session()
session.headers['Authorization'] = self.sas_token
return session
def set_sas_token(self, new_token):
self.sas_token = new_token