in azure/Kqlmagic/my_aad_helper.py [0:0]
def acquire_token(self):
"""Acquire tokens from AAD."""
previous_token = self._current_token
try:
if self._current_token is not None:
self._current_token = self._validate_and_refresh_token(self._current_token)
if self._current_token is None:
self._current_authentication_method = None
self._current_adal_context = None
if self._current_token is None:
if self._options.get("try_token") is not None:
token = self._get_aux_token(token=self._options.get("try_token"))
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._options.get("try_msi") is not None:
token = self._get_msi_token(msi_params=self._options.get("try_msi"))
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._options.get("try_azcli_login_subscription") is not None:
token = self._get_azcli_token(subscription=self._options.get("try_azcli_login_subscription"))
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._options.get("try_azcli_login"):
token = self._get_azcli_token()
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._options.get("try_azcli_login_by_profile"):
token = self._get_azcli_token()
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._adal_context_sso is not None:
token = self._get_adal_sso_token()
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
if self._adal_context is not None:
token = self._get_adal_token()
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
token = None
self._current_authentication_method = self._authentication_method
self._current_adal_context = self._adal_context_sso or self._adal_context
if self._authentication_method is AuthenticationMethod.aad_username_password:
logger().debug(f"_MyAadHelper::acquire_token - aad/user-password - resource: '{self._resource}', username: '{self._username}', password: '...', client: '{self._client_id}'")
token = self._current_adal_context.acquire_token_with_username_password(self._resource, self._username, self._password, self._client_id)
elif self._authentication_method is AuthenticationMethod.aad_application_key:
logger().debug(f"_MyAadHelper::acquire_token - aad/client-secret - resource: '{self._resource}', client: '{self._client_id}', secret: '...'")
token = self._current_adal_context.acquire_token_with_client_credentials(self._resource, self._client_id, self._client_secret)
elif self._authentication_method is AuthenticationMethod.aad_application_certificate:
logger().debug(f"_MyAadHelper::acquire_token - aad/client-certificate - resource: '{self._resource}', client: '{self._client_id}', _certificate: '...', thumbprint: '{self._thumbprint}'")
token = self._current_adal_context.acquire_token_with_client_certificate(self._resource, self._client_id, self._certificate, self._thumbprint)
elif self._authentication_method is AuthenticationMethod.aad_code_login:
logger().debug(f"_MyAadHelper::acquire_token - aad/code - resource: '{self._resource}', client: '{self._client_id}'")
code: dict = self._current_adal_context.acquire_user_code(self._resource, self._client_id)
url = code[OAuth2DeviceCodeResponseParameters.VERIFICATION_URL]
device_code = code[OAuth2DeviceCodeResponseParameters.USER_CODE].strip()
device_code_login_notification = self._options.get("device_code_login_notification")
if device_code_login_notification == "auto":
if self._options.get("notebook_app") in ["ipython"]:
device_code_login_notification = "popup_interaction"
elif self._options.get("notebook_app") in ["visualstudiocode", "azuredatastudio"]:
device_code_login_notification = "popup_interaction"
elif self._options.get("notebook_app") in ["nteract"]:
if self._options.get("kernel_location") == "local":
# ntreact cannot execute authentication script, workaround using temp_file_server webbrowser
if self._options.get("temp_files_server_address") is not None:
import urllib.parse
indirect_url = f'{self._options.get("temp_files_server_address")}/webbrowser?url={urllib.parse.quote(url)}&kernelid={self._options.get("kernel_id")}'
url = indirect_url
device_code_login_notification = "popup_interaction"
else:
device_code_login_notification = "browser"
else:
device_code_login_notification = "terminal"
else:
device_code_login_notification = "button"
if (self._options.get("kernel_location") == "local"
or device_code_login_notification in ["browser"]
or (device_code_login_notification == "popup_interaction" and self._options.get("popup_interaction") == "webbrowser_open_at_kernel")):
# copy code to local clipboard
try:
pyperclip = Dependencies.get_module("pyperclip", dont_throw=True)
if pyperclip is not None:
pyperclip.copy(device_code)
except:
pass
# if self._options.get("notebook_app")=="papermill" and self._options.get("login_code_destination") =="browser":
# raise Exception("error: using papermill without an email specified is not supported")
if device_code_login_notification == "email":
params = Parser.parse_and_get_kv_string(self._options.get('device_code_notification_email'), {})
email_notification = EmailNotification(**params)
subject = f"Kqlmagic device_code {device_code} authentication (context: {email_notification.context})"
resource = self._resource.replace("://", ":// ") # just to make sure it won't be replace in email by safelinks
email_message = f"Device_code: {device_code}\n\nYou are asked to authorize access to resource: " \
f"{resource}\n\nOpen the page {url} and enter the code {device_code} to authenticate\n\nKqlmagic"
email_notification.send_email(subject, email_message)
info_message =f"An email was sent to {email_notification.send_to} with device_code {device_code} to authenticate"
Display.showInfoMessage(info_message, display_handler_name='acquire_token', **self._options)
elif device_code_login_notification == "browser":
# this print is not for debug
print(code[OAuth2DeviceCodeResponseParameters.MESSAGE])
webbrowser.open(code[OAuth2DeviceCodeResponseParameters.VERIFICATION_URL])
elif device_code_login_notification == "terminal":
# this print is not for debug
print(code[OAuth2DeviceCodeResponseParameters.MESSAGE])
sys.stdout.flush() # Some terminal needs this to ensure the message is shown
# Ideally you should wait here, in order to save some unnecessary polling
# TODO: add flag to prompt
# input("Press Enter after signing in from another device to proceed, CTRL+C to abort.")
elif device_code_login_notification == "popup_interaction" and self._options.get("popup_interaction") != "memory_button":
before_text = f"<b>{device_code}</b>"
button_text = "Copy code to clipboard and authenticate"
# before_text = f"Copy code: {device_code} to verification url: {url} and "
# button_text='authenticate'
# Display.showInfoMessage(f"Copy code: {device_code} to verification url: {url} and authenticate", display_handler_name='acquire_token', **options)
Display.show_window(
'verification_url',
url,
button_text=button_text,
# palette=Display.info_style,
before_text=before_text,
display_handler_name='acquire_token',
**self._options
)
else: # device_code_login_notification == "button":
html_str = (
f"""<!DOCTYPE html>
<html><body>
<!-- h1 id="user_code_p"><b>{device_code}</b><br></h1-->
<input id="kql_MagicCodeAuthInput" type="text" readonly style="font-weight: bold; border: none;" size={single_quote(len(device_code))} value={single_quote(device_code)}>
<button id='kql_MagicCodeAuth_button', onclick="this.style.visibility='hidden';kql_MagicCodeAuthFunction()">Copy code to clipboard and authenticate</button>
<script>
var kql_MagicUserCodeAuthWindow = null;
function kql_MagicCodeAuthFunction() {{
/* Get the text field */
var copyText = document.getElementById("kql_MagicCodeAuthInput");
/* Select the text field */
copyText.select();
/* Copy the text inside the text field */
document.execCommand("copy");
/* Alert the copied text */
// alert("Copied the text: " + copyText.value);
var w = screen.width / 2;
var h = screen.height / 2;
params = 'width='+w+',height='+h
kql_MagicUserCodeAuthWindow = window.open('{url}', 'kql_MagicUserCodeAuthWindow', params);
// TODO: save selected cell index, so that the clear will be done on the lince cell
}}
</script>
</body></html>"""
)
Display.show_html(html_str, display_handler_name='acquire_token', **self._options)
try:
token = self._current_adal_context.acquire_token_with_device_code(self._resource, code, self._client_id)
logger().debug(f"_MyAadHelper::acquire_token - got token - resource: '{self._resource}', client: '{self._client_id}', token type: '{type(token)}'")
self._username = self._username or token.get(TokenResponseFields.USER_ID)
finally:
html_str = """<!DOCTYPE html>
<html><body><script>
// close authentication window
if (kql_MagicUserCodeAuthWindow && kql_MagicUserCodeAuthWindow.opener != null && !kql_MagicUserCodeAuthWindow.closed) {
kql_MagicUserCodeAuthWindow.close()
}
// TODO: make sure, you clear the right cell. BTW, not sure it is a must to do any clearing
// clear output cell
Jupyter.notebook.clear_output(Jupyter.notebook.get_selected_index())
// TODO: if in run all mode, move to last cell, otherwise move to next cell
// move to next cell
</script></body></html>"""
Display.show_html(html_str, display_handler_name='acquire_token', **self._options)
self._current_token = self._validate_and_refresh_token(token)
if self._current_token is None:
raise AuthenticationError("No valid token.")
if self._current_token != previous_token:
self._warn_token_diff_from_conn_str()
else:
logger().debug(f"_MyAadHelper::acquire_token - valid token exist - resource: '{self._resource}', username: '{self._username}', client: '{self._client_id}'")
return self._create_authorization_header()
except Exception as e:
kwargs = self._get_authentication_error_kwargs()
raise AuthenticationError(e, **kwargs)