azext_iot/common/auth.py (33 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Optional
from azure.cli.core._profile import Profile
from msrest.authentication import Authentication
def get_aad_token(cli_ctx, resource: Optional[str] = None):
"""
get AAD token to access to a specified resource
:param resource: Azure resource endpoints. Default to Azure Resource Manager
Use 'az cloud show' command for other Azure resources
"""
resource = resource or cli_ctx.cloud.endpoints.active_directory_resource_id
profile = Profile(cli_ctx=cli_ctx)
creds, subscription, tenant = profile.get_raw_token(
subscription=None, resource=resource
)
return {
"tokenType": creds[0],
"accessToken": creds[1],
"expiresOn": creds[2].get("expiresOn", "N/A"),
"subscription": subscription,
"tenant": tenant,
}
class IoTOAuth(Authentication):
"""
Azure AD OAuth for Azure IoT Hub and DPS.
"""
def __init__(self, cli_ctx, resource_id: Optional[str] = None):
self.resource_id = resource_id
self.cli_ctx = cli_ctx
def signed_session(self, session=None):
"""
Create requests session with SAS auth headers.
If a session object is provided, configure it directly. Otherwise,
create a new session and return it.
Returns:
session (): requests.Session.
"""
return self.refresh_session(session)
def refresh_session(
self, session=None,
):
"""
Refresh requests session with SAS auth headers.
If a session object is provided, configure it directly. Otherwise,
create a new session and return it.
Returns:
session (): requests.Session.
"""
session = session or super(IoTOAuth, self).signed_session()
parsed_token = get_aad_token(
cli_ctx=self.cli_ctx, resource=self.resource_id
)
session.headers["Authorization"] = "{} {}".format(
parsed_token["tokenType"], parsed_token["accessToken"]
)
return session