in azure/Kqlmagic/my_aad_helper_msal.py [0:0]
def acquire_token(self):
"""Acquire tokens from AAD."""
acquire_token_result = None
previous_token = self._current_token
try:
if self._current_token is not None:
self._current_token = self._validate_and_refresh_token(self._current_token)
if self._current_token is None:
self._current_authentication_method = None
self._current_msal_client_app = None
self._current_client_app_type = None
self._current_scopes = None
self._current_username = None
if self._current_token is None:
if self._options.get("try_token") is not None:
token = self._get_aux_token(token=self._options.get("try_token"))
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._options.get("try_msi") is not None:
token = self._get_msi_token(msi_params=self._options.get("try_msi"))
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._options.get("try_azcli_login_subscription") is not None:
token = self._get_azcli_token(subscription=self._options.get("try_azcli_login_subscription"))
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._options.get("try_azcli_login"):
token = self._get_azcli_token()
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._options.get("try_azcli_login_by_profile"):
token = self._get_azcli_token_by_profile()
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._options.get("try_vscode_login"):
token = self._get_vscode_token()
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
self._current_authentication_method = self._authentication_method
self._current_msal_client_app = self._msal_client_app_sso or self._msal_client_app
self._current_client_app_type = self._client_app_type
self._current_scopes = self._scopes
self._current_username = self._username
if self._current_token is None:
if self._msal_client_app_sso is not None:
self._current_msal_client_app = self._msal_client_app_sso
self._current_token = self._acquire_msal_token_silent(sso_flow_code=True)
if self._current_token is None:
if self._msal_client_app is not None:
self._current_msal_client_app = self._msal_client_app
self._current_token = self._acquire_msal_token_silent()
if self._current_token is None:
logger().debug("No suitable token exists in cache. Let's get a new one from AAD.")
self._current_msal_client_app = self._msal_client_app_sso or self._msal_client_app
if self._authentication_method is AuthenticationMethod.aad_username_password:
# See this page for constraints of Username Password Flow.
# https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
acquire_token_result = self._current_msal_client_app.acquire_token_by_username_password(
self._username, self._password, scopes=self._scopes)
elif self._authentication_method is AuthenticationMethod.aad_application_key:
acquire_token_result = self._current_msal_client_app.acquire_token_for_client(scopes=self._scopes)
elif self._authentication_method is AuthenticationMethod.aad_application_certificate:
acquire_token_result = self._current_msal_client_app.acquire_token_for_client(scopes=self._scopes)
elif self._authentication_method is AuthenticationMethod.aad_code_login and (
self._options.get("code_auth_interactive_mode") == "auth_code" or (self._options.get("kernel_location") == "local" and self._options.get("code_auth_interactive_mode") == "auto")):
acquire_token_result = self._current_msal_client_app.acquire_token_interactive(
scopes=self._scopes,
login_hint = self._current_username,
prompt="select_account")
if TokenResponseFieldsV2.ACCESS_TOKEN in acquire_token_result:
logger().debug(f"_MyAadHelper::acquire_token - got token - scopes: '{self._scopes}', client: '{self._client_id}'")
self._username = self._username or self._get_token_user_id(acquire_token_result) or self._get_username_from_token(acquire_token_result)
self._current_username = self._username
elif self._authentication_method is AuthenticationMethod.aad_code_login:
flow = self._current_msal_client_app.initiate_device_flow(scopes=self._scopes)
url = flow[OAuth2DeviceCodeResponseParameters.VERIFICATION_URL]
device_code = flow[OAuth2DeviceCodeResponseParameters.USER_CODE].strip()
device_code_login_notification = self._options.get("device_code_login_notification")
if device_code_login_notification == "auto":
if self._options.get("notebook_app") in ["ipython"]:
device_code_login_notification = "popup_interaction"
elif self._options.get("notebook_app") in ["visualstudiocode", "azuredatastudio"]:
device_code_login_notification = "popup_interaction"
elif self._options.get("notebook_app") in ["nteract"]:
if self._options.get("kernel_location") == "local":
# ntreact cannot execute authentication script, workaround using temp_file_server webbrowser
if self._options.get("temp_files_server_address") is not None:
import urllib.parse
indirect_url = f'{self._options.get("temp_files_server_address")}/webbrowser?url={urllib.parse.quote(url)}&kernelid={self._options.get("kernel_id")}'
url = indirect_url
device_code_login_notification = "popup_interaction"
else:
device_code_login_notification = "browser"
else:
device_code_login_notification = "terminal"
elif self._options.get("notebook_app") in ["azureml", "azuremljupyternotebook", "azuremljupyterlab"]:
device_code_login_notification = "terminal_reference"
else:
device_code_login_notification = "button"
if (self._options.get("kernel_location") == "local"
or device_code_login_notification in ["browser"]
or (device_code_login_notification == "popup_interaction" and self._options.get("popup_interaction") == "webbrowser_open_at_kernel")):
# copy code to local clipboard
try:
pyperclip = Dependencies.get_module("pyperclip", dont_throw=True)
if pyperclip is not None:
pyperclip.copy(device_code)
except:
pass
# if self._options.get("notebook_app")=="papermill" and self._options.get("login_code_destination") =="browser":
# raise Exception("error: using papermill without an email specified is not supported")
if device_code_login_notification == "email":
params = Parser.parse_and_get_kv_string(self._options.get('device_code_notification_email'), {})
email_notification = EmailNotification(**params)
subject = f"Kqlmagic device_code {device_code} authentication (context: {email_notification.context})"
resource = self._resource.replace("://", ":// ") # just to make sure it won't be replace in email by safelinks
email_message = f"Device_code: {device_code}\n\nYou are asked to authorize access to resource: {resource}\n\n" \
f"Open the page {url} and enter the code {device_code} to authenticate\n\nKqlmagic"
email_notification.send_email(subject, email_message)
info_message =f"An email was sent to {email_notification.send_to} with device_code {device_code} to authenticate"
Display.showInfoMessage(info_message, display_handler_name='acquire_token', **self._options)
elif device_code_login_notification == "browser":
# this print is not for debug
print(flow[OAuth2DeviceCodeResponseParameters.MESSAGE])
webbrowser.open(flow[OAuth2DeviceCodeResponseParameters.VERIFICATION_URL])
elif device_code_login_notification == "terminal":
# this print is not for debug
print(flow[OAuth2DeviceCodeResponseParameters.MESSAGE])
sys.stdout.flush() # Some terminal needs this to ensure the message is shown
# Ideally you should wait here, in order to save some unnecessary polling
# TODO: add flag to prompt
# input("Press Enter after signing in from another device to proceed, CTRL+C to abort.")
elif device_code_login_notification == "terminal_reference":
# copy code to local clipboard
try:
pyperclip = Dependencies.get_module("pyperclip", dont_throw=True)
if pyperclip is not None:
pyperclip.copy(device_code)
except:
pass
html_str = (
f"""<!DOCTYPE html>
<html><body>
<input id="kql_MagicCodeAuthInput" type="text" readonly style="font-weight: bold; border: none;" size={single_quote(len(device_code))} value={single_quote(device_code)}>
<script>
function kql_MagicCopyCodeFunction() {{
/* Get the text field */
var copyText = document.getElementById("kql_MagicCodeAuthInput");
/* Select the text field */
copyText.select();
/* Copy the text inside the text field */
document.execCommand("copy");
/* Alert the copied text */
// alert("Copied the text: " + copyText.value);
}}
kql_MagicCopyCodeFunction()
</script>
</body></html>"""
)
Display.show_html(html_str, display_handler_name='acquire_token', **self._options)
msg = f"Copy code to clipboard and authenticate here: {flow[OAuth2DeviceCodeResponseParameters.VERIFICATION_URL]}"
# this print is not for debug
print(msg)
sys.stdout.flush() # Some terminal needs this to ensure the message is shown
# Ideally you should wait here, in order to save some unnecessary polling
# TODO: add flag to prompt
# input("Press Enter after signing in from another device to proceed, CTRL+C to abort.")
elif device_code_login_notification == "popup_interaction" and self._options.get("popup_interaction") != "memory_button":
before_text = f"<b>{device_code}</b>"
button_text = "Copy code to clipboard and authenticate"
# before_text = f"Copy code: {device_code} to verification url: {url} and "
# button_text='authenticate'
# Display.showInfoMessage(f"Copy code: {device_code} to verification url: {url} and authenticate", display_handler_name='acquire_token', **options)
Display.show_window(
'verification_url',
url,
button_text=button_text,
# palette=Display.info_style,
before_text=before_text,
display_handler_name='acquire_token',
**self._options
)
else: # device_code_login_notification == "button":
html_str = (
f"""<!DOCTYPE html>
<html><body>
<!-- h1 id="user_code_p"><b>{device_code}</b><br></h1-->
<input id="kql_MagicCodeAuthInput" type="text" readonly style="font-weight: bold; border: none;" size={single_quote(len(device_code))} value={single_quote(device_code)}>
<button id='kql_MagicCodeAuth_button', onclick="this.style.visibility='hidden';kql_MagicCodeAuthFunction()">Copy code to clipboard and authenticate</button>
<script>
var kql_MagicUserCodeAuthWindow = null;
function kql_MagicCodeAuthFunction() {{
/* Get the text field */
var copyText = document.getElementById("kql_MagicCodeAuthInput");
/* Select the text field */
copyText.select();
/* Copy the text inside the text field */
document.execCommand("copy");
/* Alert the copied text */
// alert("Copied the text: " + copyText.value);
var w = screen.width / 2;
var h = screen.height / 2;
params = 'width='+w+',height='+h
kql_MagicUserCodeAuthWindow = window.open('{url}', 'kql_MagicUserCodeAuthWindow', params);
// TODO: save selected cell index, so that the clear will be done on the lince cell
}}
</script>
</body></html>"""
)
Display.show_html(html_str, display_handler_name='acquire_token', **self._options)
#
# wait for flow to finish
#
try:
# Ideally you should wait here, in order to save some unnecessary polling
# input("Press Enter after signing in from another device to proceed, CTRL+C to abort.")
acquire_token_result = self._current_msal_client_app.acquire_token_by_device_flow(flow) # By default it will block
# You can follow this instruction to shorten the block time
# https://msal-python.readthedocs.io/en/latest/#msal.PublicClientApplication.acquire_token_by_device_flow
# or you may even turn off the blocking behavior,
# and then keep calling acquire_token_by_device_flow(flow) in your own customized loop.
if TokenResponseFieldsV2.ACCESS_TOKEN in acquire_token_result:
logger().debug(f"_MyAadHelper::acquire_token - got token - scopes: '{self._scopes}', client: '{self._client_id}'")
self._username = self._username or self._get_token_user_id(acquire_token_result) or self._get_username_from_token(acquire_token_result)
self._current_username = self._username
finally:
html_str = """<!DOCTYPE html>
<html><body><script>
// close authentication window
if (kql_MagicUserCodeAuthWindow && kql_MagicUserCodeAuthWindow.opener != null && !kql_MagicUserCodeAuthWindow.closed) {
kql_MagicUserCodeAuthWindow.close()
}
// TODO: make sure, you clear the right cell. BTW, not sure it is a must to do any clearing
// clear output cell
Jupyter.notebook.clear_output(Jupyter.notebook.get_selected_index())
// TODO: if in run all mode, move to last cell, otherwise move to next cell
// move to next cell
</script></body></html>"""
Display.show_html(html_str, display_handler_name='acquire_token', **self._options)
if acquire_token_result and TokenResponseFieldsV2.ACCESS_TOKEN in acquire_token_result:
self._current_token = acquire_token_result
if self._current_token is None:
raise AuthenticationError("Failed to create token", acquire_token_result=acquire_token_result)
if self._current_token != previous_token:
self._warn_token_diff_from_conn_str()
else:
logger().debug(f"_MyAadHelper::acquire_token - valid token exist - scopes: '{self._scopes}', username: '{self._username}', client: '{self._client_id}'")
return self._create_authorization_header()
except Exception as e:
kwargs = self._get_authentication_error_kwargs()
raise AuthenticationError(e, **kwargs)