def fetch_saml_response()

in redshift_connector/plugin/browser_azure_credentials_provider.py [0:0]


    def fetch_saml_response(self: "BrowserAzureCredentialsProvider", token):
        import requests

        url: str = "https://login.microsoftonline.com/{tenant}/oauth2/token".format(tenant=self.idp_tenant)
        # headers to pass with POST request
        headers: typing.Dict[str, str] = azure_headers

        # required parameters to pass in POST body
        payload: typing.Dict[str, typing.Optional[str]] = {
            "code": token,
            "requested_token_type": "urn:ietf:params:oauth:token-type:saml2",
            "grant_type": "authorization_code",
            "scope": "openid",
            "resource": self.client_id,
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "redirect_uri": self.redirectUri,
        }

        _logger.debug("Uri: {}".format(url))

        try:
            response = requests.post(url, data=payload, headers=headers, verify=self.do_verify_ssl_cert())
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
            if "response" in vars():
                _logger.debug("Fetch_saml_response https response: {}".format(response.content))  # type: ignore
            else:
                _logger.debug("Fetch_saml_response could not receive https response due to an error")
            _logger.error("Request for authentication from Microsoft was unsuccessful. {}".format(str(e)))
            raise InterfaceError(e)
        except requests.exceptions.Timeout as e:
            _logger.error("A timeout occurred when requesting authentication from Azure")
            raise InterfaceError(e)
        except requests.exceptions.TooManyRedirects as e:
            _logger.error(
                "A error occurred when requesting authentication from Azure. Verify RedshiftProperties are correct"
            )
            raise InterfaceError(e)
        except requests.exceptions.RequestException as e:
            _logger.error("A unknown error occurred when requesting authentication from Azure")
            raise InterfaceError(e)

        _logger.debug(response.text)

        try:
            saml_assertion: str = response.json()["access_token"]
        except TypeError as e:
            _logger.error("Failed to decode saml assertion returned from Azure")
            raise InterfaceError(e)
        except KeyError as e:
            _logger.error("Azure access_token was not found in saml assertion")
            raise InterfaceError(e)
        except Exception as e:
            raise InterfaceError(e)
        if saml_assertion == "":
            raise InterfaceError("Azure access_token is empty")

        missing_padding: int = 4 - len(saml_assertion) % 4
        if missing_padding:
            saml_assertion += "=" * missing_padding

        return str(base64.urlsafe_b64decode(saml_assertion))