in azure/Kqlmagic/my_aad_helper_msal.py [0:0]
def __init__(self, kcsb, default_clientid, msal_client_app=None, msal_client_app_sso=None, **options):
global global_msal_client_app
global global_msal_client_app_sso
super(_MyAadHelper, self).__init__(kcsb, default_clientid, msal_client_app, msal_client_app_sso, **options)
self._username = None
if all([kcsb.aad_user_id, kcsb.password]):
self._authentication_method = AuthenticationMethod.aad_username_password
self._username = kcsb.aad_user_id
self._password = kcsb.password
elif all([kcsb.application_client_id, kcsb.application_key]):
self._authentication_method = AuthenticationMethod.aad_application_key
self._client_secret = kcsb.application_key
self._client_credential = self._client_secret
elif all([kcsb.application_client_id, kcsb.application_certificate, kcsb.application_certificate_thumbprint]):
self._authentication_method = AuthenticationMethod.aad_application_certificate
self._certificate = kcsb.application_certificate
self._thumbprint = kcsb.application_certificate_thumbprint
self._client_credential = {
"private_key": self._certificate,
"thumbprint": self._thumbprint,
}
else:
self._authentication_method = AuthenticationMethod.aad_code_login
self._username = kcsb.aad_user_id # optional
self._client_app_type = AuthenticationMethod.client_app_type.get(self._authentication_method)
# to provide stickiness, to avoid switching tokens when not required
self._current_token = None
self._current_msal_client_app = None
self._current_authentication_method = None
self._current_client_app_type = None
self._current_scopes = None
self._current_username = None
self._token_claims_cache = (None, None)
self._try_token_msal_client_app = None
self._try_azcli_msal_client_app = None
self._try_azcli_sub_msal_client_app = None
self._try_msi_msal_client_app = None
# options are freezed for authentication when object is created,
# to eliminate the need to specify auth option on each query, and to modify behavior on exah query
self._options = {**options}
# track warning to avoid repeating
self._displayed_warnings = []
url = urlparse(kcsb.data_source)
self._resource = f"{url.scheme}://{url.hostname}"
self._scopes = [f"{self._resource}/.default"]
self._authority = kcsb.authority_id or "common"
self._aad_login_url = self._get_aad_login_url(kcsb.conn_kv.get(ConnStrKeys.AAD_URL))
self._authority_uri = f"{self._aad_login_url}/{self._authority}"
self._client_id = kcsb.application_client_id or default_clientid
self._client_app_key = self._create_client_app_key()
self._set_msal_client_app(msal_client_app=msal_client_app, msal_client_app_sso=msal_client_app_sso)