def acquire_token()

in azure/Kqlmagic/my_aad_helper.py [0:0]


    def acquire_token(self):
        """Acquire tokens from AAD."""
        previous_token = self._current_token
        try:
            if self._current_token is not None:
                self._current_token = self._validate_and_refresh_token(self._current_token)

            if self._current_token is None:
                self._current_authentication_method = None
                self._current_adal_context = None

            if self._current_token is None:
                if self._options.get("try_token") is not None:
                    token = self._get_aux_token(token=self._options.get("try_token"))
                    self._current_token = self._validate_and_refresh_token(token)

            if self._current_token is None:
                if self._options.get("try_msi") is not None:
                    token = self._get_msi_token(msi_params=self._options.get("try_msi"))
                    self._current_token = self._validate_and_refresh_token(token)

            if self._current_token is None:
                if self._options.get("try_azcli_login_subscription") is not None:
                    token = self._get_azcli_token(subscription=self._options.get("try_azcli_login_subscription"))
                    self._current_token = self._validate_and_refresh_token(token)

            if self._current_token is None:
                if self._options.get("try_azcli_login"):
                    token = self._get_azcli_token()
                    self._current_token = self._validate_and_refresh_token(token)

            if self._current_token is None:
                if self._options.get("try_azcli_login_by_profile"):
                    token = self._get_azcli_token()
                    self._current_token = self._validate_and_refresh_token(token)

            if self._current_token is None:
                if self._adal_context_sso is not None:
                    token = self._get_adal_sso_token()
                    self._current_token = self._validate_and_refresh_token(token)

            if self._current_token is None:
                if self._adal_context is not None:
                    token = self._get_adal_token()
                    self._current_token = self._validate_and_refresh_token(token)

            if self._current_token is None:
                token = None
                self._current_authentication_method = self._authentication_method
                self._current_adal_context = self._adal_context_sso or self._adal_context

                if self._authentication_method is AuthenticationMethod.aad_username_password:
                    logger().debug(f"_MyAadHelper::acquire_token - aad/user-password - resource: '{self._resource}', username: '{self._username}', password: '...', client: '{self._client_id}'")
                    token = self._current_adal_context.acquire_token_with_username_password(self._resource, self._username, self._password, self._client_id)

                elif self._authentication_method is AuthenticationMethod.aad_application_key:
                    logger().debug(f"_MyAadHelper::acquire_token - aad/client-secret - resource: '{self._resource}', client: '{self._client_id}', secret: '...'")
                    token = self._current_adal_context.acquire_token_with_client_credentials(self._resource, self._client_id, self._client_secret)

                elif self._authentication_method is AuthenticationMethod.aad_application_certificate:
                    logger().debug(f"_MyAadHelper::acquire_token - aad/client-certificate - resource: '{self._resource}', client: '{self._client_id}', _certificate: '...', thumbprint: '{self._thumbprint}'")
                    token = self._current_adal_context.acquire_token_with_client_certificate(self._resource, self._client_id, self._certificate, self._thumbprint)                

                elif self._authentication_method is AuthenticationMethod.aad_code_login:
                    logger().debug(f"_MyAadHelper::acquire_token - aad/code - resource: '{self._resource}', client: '{self._client_id}'")
                    code: dict = self._current_adal_context.acquire_user_code(self._resource, self._client_id)
                    url = code[OAuth2DeviceCodeResponseParameters.VERIFICATION_URL]
                    device_code = code[OAuth2DeviceCodeResponseParameters.USER_CODE].strip()

                    device_code_login_notification = self._options.get("device_code_login_notification")
                    if device_code_login_notification == "auto":
                        if self._options.get("notebook_app") in ["ipython"]:
                            device_code_login_notification = "popup_interaction"
                        elif self._options.get("notebook_app") in ["visualstudiocode", "azuredatastudio"]:
                            device_code_login_notification = "popup_interaction"
                        elif self._options.get("notebook_app") in ["nteract"]:

                            if self._options.get("kernel_location") == "local":
                                # ntreact cannot execute authentication script, workaround using temp_file_server webbrowser
                                if self._options.get("temp_files_server_address") is not None:
                                    import urllib.parse
                                    indirect_url = f'{self._options.get("temp_files_server_address")}/webbrowser?url={urllib.parse.quote(url)}&kernelid={self._options.get("kernel_id")}'
                                    url = indirect_url
                                    device_code_login_notification = "popup_interaction"
                                else:
                                    device_code_login_notification = "browser"
                            else:
                                device_code_login_notification = "terminal"
                        else:
                            device_code_login_notification = "button"

                    if (self._options.get("kernel_location") == "local"
                            or device_code_login_notification in ["browser"]
                            or (device_code_login_notification == "popup_interaction" and self._options.get("popup_interaction") == "webbrowser_open_at_kernel")):
                        # copy code to local clipboard
                        try:
                            pyperclip = Dependencies.get_module("pyperclip", dont_throw=True)
                            if pyperclip is not None:
                                pyperclip.copy(device_code)
                        except:
                            pass

                    # if  self._options.get("notebook_app")=="papermill" and self._options.get("login_code_destination") =="browser":
                    #     raise Exception("error: using papermill without an email specified is not supported")
                    if device_code_login_notification == "email":
                        params = Parser.parse_and_get_kv_string(self._options.get('device_code_notification_email'), {})
                        email_notification = EmailNotification(**params)
                        subject = f"Kqlmagic device_code {device_code} authentication (context: {email_notification.context})"
                        resource = self._resource.replace("://", ":// ")  # just to make sure it won't be replace in email by safelinks
                        email_message = f"Device_code: {device_code}\n\nYou are asked to authorize access to resource: " \
                                        f"{resource}\n\nOpen the page {url} and enter the code {device_code} to authenticate\n\nKqlmagic"
                        email_notification.send_email(subject, email_message)
                        info_message =f"An email was sent to {email_notification.send_to} with device_code {device_code} to authenticate"
                        Display.showInfoMessage(info_message, display_handler_name='acquire_token', **self._options)

                    elif device_code_login_notification == "browser":
                        # this print is not for debug
                        print(code[OAuth2DeviceCodeResponseParameters.MESSAGE])
                        webbrowser.open(code[OAuth2DeviceCodeResponseParameters.VERIFICATION_URL])

                    elif device_code_login_notification == "terminal":
                        # this print is not for debug
                        print(code[OAuth2DeviceCodeResponseParameters.MESSAGE])
                        sys.stdout.flush()  # Some terminal needs this to ensure the message is shown

                        # Ideally you should wait here, in order to save some unnecessary polling
                        # TODO: add flag to prompt
                        # input("Press Enter after signing in from another device to proceed, CTRL+C to abort.")

                    elif device_code_login_notification == "popup_interaction" and self._options.get("popup_interaction") != "memory_button":
                        before_text = f"<b>{device_code}</b>"
                        button_text = "Copy code to clipboard and authenticate"
                        # before_text = f"Copy code: {device_code} to verification url: {url} and "
                        # button_text='authenticate'
                        # Display.showInfoMessage(f"Copy code: {device_code} to verification url: {url} and authenticate", display_handler_name='acquire_token', **options)
                        Display.show_window(
                            'verification_url',
                            url,
                            button_text=button_text,
                            # palette=Display.info_style,
                            before_text=before_text,
                            display_handler_name='acquire_token',
                            **self._options
                        )

                    else:  # device_code_login_notification == "button":
                        html_str = (
                            f"""<!DOCTYPE html>
                            <html><body>

                            <!-- h1 id="user_code_p"><b>{device_code}</b><br></h1-->

                            <input  id="kql_MagicCodeAuthInput" type="text" readonly style="font-weight: bold; border: none;" size={single_quote(len(device_code))} value={single_quote(device_code)}>

                            <button id='kql_MagicCodeAuth_button', onclick="this.style.visibility='hidden';kql_MagicCodeAuthFunction()">Copy code to clipboard and authenticate</button>

                            <script>
                            var kql_MagicUserCodeAuthWindow = null;
                            function kql_MagicCodeAuthFunction() {{
                                /* Get the text field */
                                var copyText = document.getElementById("kql_MagicCodeAuthInput");

                                /* Select the text field */
                                copyText.select();

                                /* Copy the text inside the text field */
                                document.execCommand("copy");

                                /* Alert the copied text */
                                // alert("Copied the text: " + copyText.value);

                                var w = screen.width / 2;
                                var h = screen.height / 2;
                                params = 'width='+w+',height='+h
                                kql_MagicUserCodeAuthWindow = window.open('{url}', 'kql_MagicUserCodeAuthWindow', params);

                                // TODO: save selected cell index, so that the clear will be done on the lince cell
                            }}
                            </script>

                            </body></html>"""
                        )
                        Display.show_html(html_str, display_handler_name='acquire_token', **self._options)

                    try:
                        token = self._current_adal_context.acquire_token_with_device_code(self._resource, code, self._client_id)
                        logger().debug(f"_MyAadHelper::acquire_token - got token - resource: '{self._resource}', client: '{self._client_id}', token type: '{type(token)}'")
                        self._username = self._username or token.get(TokenResponseFields.USER_ID)

                    finally:
                        html_str = """<!DOCTYPE html>
                            <html><body><script>

                                // close authentication window
                                if (kql_MagicUserCodeAuthWindow && kql_MagicUserCodeAuthWindow.opener != null && !kql_MagicUserCodeAuthWindow.closed) {
                                    kql_MagicUserCodeAuthWindow.close()
                                }
                                // TODO: make sure, you clear the right cell. BTW, not sure it is a must to do any clearing

                                // clear output cell
                                Jupyter.notebook.clear_output(Jupyter.notebook.get_selected_index())

                                // TODO: if in run all mode, move to last cell, otherwise move to next cell
                                // move to next cell

                            </script></body></html>"""

                        Display.show_html(html_str, display_handler_name='acquire_token', **self._options)

                self._current_token = self._validate_and_refresh_token(token)

            if self._current_token is None:
                raise AuthenticationError("No valid token.")

            if self._current_token != previous_token:
                self._warn_token_diff_from_conn_str()
            else:
                logger().debug(f"_MyAadHelper::acquire_token - valid token exist - resource: '{self._resource}', username: '{self._username}', client: '{self._client_id}'")

            return self._create_authorization_header()
        except Exception as e:
            kwargs = self._get_authentication_error_kwargs()
            raise AuthenticationError(e, **kwargs)