azure/Kqlmagic/my_aad_helper.py [751:814]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            if aad_login_url is None:
                raise KqlEngineError(f"AAD is not known for this cloud '{cloud}', please use aadurl property in connection string.")
        return aad_login_url


    def _warn_on_token_validation_failure(self, message)->None:
        if self._options.get("auth_token_warnings"):
            if self._current_authentication_method is not None and message is not None:
                warn_message =f"Can't use '{self._current_authentication_method}' token entry, {message}'"
                Display.showWarningMessage(warn_message, display_handler_name='acquire_token', **self._options)


    def _warn_token_diff_from_conn_str(self)->None:
        if self._options.get("auth_token_warnings"):
            token = self._current_token
            if token is not None:
                # to avoid more than one warning per connection, keep track of already displayed warnings
                access_token = self._get_token_access_token(token)
                key = hash((access_token))
                if key in self._displayed_warnings:
                    return
                else:
                    self._displayed_warnings.append(key)

                token_username = self._get_token_user_id(token) or self._get_username_from_token(token)
                if token_username is not None and self._username is not None and token_username != self._username:
                    warn_message =f"authenticated username '{token_username}' is different from connectiion string username '{self._username}'"
                    Display.showWarningMessage(warn_message, display_handler_name='acquire_token', **self._options)

                token_authority_uri = self._get_token_authority(token) or self._get_authority_from_token(token)
                if token_authority_uri != self._authority_uri and not self._authority_uri.endswith("/common") and not token_authority_uri.endswith("/common"):
                    warn_message =f"authenticated authority '{token_authority_uri}' is different from connectiion string authority '{self._authority_uri}'"
                    Display.showWarningMessage(warn_message, display_handler_name='acquire_token', **self._options)

                token_client_id = self._get_token_client_id(token) or self._get_client_id_from_token(token)
                if token_client_id is not None and self._client_id is not None and token_client_id != self._client_id:
                    warn_message =f"authenticated client_id '{token_client_id}' is different from connectiion string client_id '{self._client_id}'"
                    Display.showWarningMessage(warn_message, display_handler_name='acquire_token', **self._options)

                token_resources = self._get_token_resource(token) or self._get_resources_from_token(token)
                if type(token_resources) == str:
                    token_resources = [token_resources]
                if token_resources is not None and self._resource is not None and self._resource not in token_resources:
                    warn_message =f"authenticated resources '{token_resources}' does not include connectiion string resource '{self._resource}'"
                    Display.showWarningMessage(warn_message, display_handler_name='acquire_token', **self._options)


    def _get_authentication_error_kwargs(self):
        " collect info for AuthenticationError exception and raise it"
        kwargs = {}
        if self._current_authentication_method is AuthenticationMethod.aad_username_password:
            kwargs = {"username": self._username, "client_id": self._client_id}
        elif self._current_authentication_method is AuthenticationMethod.aad_application_key:
            kwargs = {"client_id": self._client_id}
        elif self._current_authentication_method is AuthenticationMethod.aad_code_login:
            kwargs = {"client_id": self._client_id}
        elif self._current_authentication_method is AuthenticationMethod.aad_application_certificate:
            kwargs = {"client_id": self._client_id, "thumbprint": self._thumbprint}
        elif self._current_authentication_method is AuthenticationMethod.managed_service_identity:
            kwargs = self._options.get("try_msi")
        elif self._current_authentication_method is AuthenticationMethod.azcli_login:
            pass
        elif self._current_authentication_method is AuthenticationMethod.azcli_login_by_profile:
            pass
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



azure/Kqlmagic/my_aad_helper_msal.py [1118:1181]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            if aad_login_url is None:
                raise KqlEngineError(f"AAD is not known for this cloud '{cloud}', please use aadurl property in connection string.")
        return aad_login_url


    def _warn_on_token_validation_failure(self, message)->None:
        if self._options.get("auth_token_warnings"):
            if self._current_authentication_method is not None and message is not None:
                warn_message =f"Can't use '{self._current_authentication_method}' token entry, {message}'"
                Display.showWarningMessage(warn_message, display_handler_name='acquire_token', **self._options)


    def _warn_token_diff_from_conn_str(self)->None:
        if self._options.get("auth_token_warnings"):
            token = self._current_token
            if token is not None:
                # to avoid more than one warning per connection, keep track of already displayed warnings
                access_token = self._get_token_access_token(token)
                key = hash((access_token))
                if key in self._displayed_warnings:
                    return
                else:
                    self._displayed_warnings.append(key)

                token_username = self._get_token_user_id(token) or self._get_username_from_token(token)
                if token_username is not None and self._username is not None and token_username != self._username:
                    warn_message =f"authenticated username '{token_username}' is different from connectiion string username '{self._username}'"
                    Display.showWarningMessage(warn_message, display_handler_name='acquire_token', **self._options)

                token_authority_uri = self._get_token_authority(token) or self._get_authority_from_token(token)
                if token_authority_uri != self._authority_uri and not self._authority_uri.endswith("/common") and not token_authority_uri.endswith("/common"):
                    warn_message =f"authenticated authority '{token_authority_uri}' is different from connectiion string authority '{self._authority_uri}'"
                    Display.showWarningMessage(warn_message, display_handler_name='acquire_token', **self._options)

                token_client_id = self._get_token_client_id(token) or self._get_client_id_from_token(token)
                if token_client_id is not None and self._client_id is not None and token_client_id != self._client_id:
                    warn_message =f"authenticated client_id '{token_client_id}' is different from connectiion string client_id '{self._client_id}'"
                    Display.showWarningMessage(warn_message, display_handler_name='acquire_token', **self._options)

                token_resources = self._get_token_resource(token) or self._get_resources_from_token(token)
                if type(token_resources) == str:
                    token_resources = [token_resources]
                if token_resources is not None and self._resource is not None and self._resource not in token_resources:
                    warn_message =f"authenticated resources '{token_resources}' does not include connectiion string resource '{self._resource}'"
                    Display.showWarningMessage(warn_message, display_handler_name='acquire_token', **self._options)


    def _get_authentication_error_kwargs(self):
        " collect info for AuthenticationError exception and raise it"
        kwargs = {}
        if self._current_authentication_method is AuthenticationMethod.aad_username_password:
            kwargs = {"username": self._username, "client_id": self._client_id}
        elif self._current_authentication_method is AuthenticationMethod.aad_application_key:
            kwargs = {"client_id": self._client_id}
        elif self._current_authentication_method is AuthenticationMethod.aad_code_login:
            kwargs = {"client_id": self._client_id}
        elif self._current_authentication_method is AuthenticationMethod.aad_application_certificate:
            kwargs = {"client_id": self._client_id, "thumbprint": self._thumbprint}
        elif self._current_authentication_method is AuthenticationMethod.managed_service_identity:
            kwargs = self._options.get("try_msi")
        elif self._current_authentication_method is AuthenticationMethod.azcli_login:
            pass
        elif self._current_authentication_method is AuthenticationMethod.azcli_login_by_profile:
            pass
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



