in redshift_connector/plugin/browser_azure_credentials_provider.py [0:0]
def fetch_saml_response(self: "BrowserAzureCredentialsProvider", token):
import requests
url: str = "https://login.microsoftonline.com/{tenant}/oauth2/token".format(tenant=self.idp_tenant)
# headers to pass with POST request
headers: typing.Dict[str, str] = azure_headers
# required parameters to pass in POST body
payload: typing.Dict[str, typing.Optional[str]] = {
"code": token,
"requested_token_type": "urn:ietf:params:oauth:token-type:saml2",
"grant_type": "authorization_code",
"scope": "openid",
"resource": self.client_id,
"client_id": self.client_id,
"client_secret": self.client_secret,
"redirect_uri": self.redirectUri,
}
_logger.debug("Uri: {}".format(url))
try:
response = requests.post(url, data=payload, headers=headers, verify=self.do_verify_ssl_cert())
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if "response" in vars():
_logger.debug("Fetch_saml_response https response: {}".format(response.content)) # type: ignore
else:
_logger.debug("Fetch_saml_response could not receive https response due to an error")
_logger.error("Request for authentication from Microsoft was unsuccessful. {}".format(str(e)))
raise InterfaceError(e)
except requests.exceptions.Timeout as e:
_logger.error("A timeout occurred when requesting authentication from Azure")
raise InterfaceError(e)
except requests.exceptions.TooManyRedirects as e:
_logger.error(
"A error occurred when requesting authentication from Azure. Verify RedshiftProperties are correct"
)
raise InterfaceError(e)
except requests.exceptions.RequestException as e:
_logger.error("A unknown error occurred when requesting authentication from Azure")
raise InterfaceError(e)
_logger.debug(response.text)
try:
saml_assertion: str = response.json()["access_token"]
except TypeError as e:
_logger.error("Failed to decode saml assertion returned from Azure")
raise InterfaceError(e)
except KeyError as e:
_logger.error("Azure access_token was not found in saml assertion")
raise InterfaceError(e)
except Exception as e:
raise InterfaceError(e)
if saml_assertion == "":
raise InterfaceError("Azure access_token is empty")
missing_padding: int = 4 - len(saml_assertion) % 4
if missing_padding:
saml_assertion += "=" * missing_padding
return str(base64.urlsafe_b64decode(saml_assertion))