def get_saml_assertion()

in redshift_connector/plugin/ping_credentials_provider.py [0:0]


    def get_saml_assertion(self: "PingCredentialsProvider") -> str:
        import bs4  # type: ignore
        import requests

        self.check_required_parameters()

        with requests.Session() as session:
            if self.partner_sp_id is None:
                self.partner_sp_id = "urn%3Aamazon%3Awebservices"

            url: str = "https://{host}:{port}/idp/startSSO.ping?PartnerSpId={sp_id}".format(
                host=self.idp_host, port=str(self.idpPort), sp_id=self.partner_sp_id
            )
            _logger.debug("Uri: {}".format(url))

            try:
                response: "requests.Response" = session.get(url, verify=self.do_verify_ssl_cert())
                response.raise_for_status()
            except requests.exceptions.HTTPError as e:
                if "response" in vars():
                    _logger.debug("Get_saml_assertion https response: {}".format(response.content))  # type: ignore
                else:
                    _logger.debug("Get_saml_assertion could not receive https response due to an error")
                _logger.error(
                    "Request for SAML assertion when refreshing credentials was unsuccessful. {}".format(str(e))
                )
                raise InterfaceError(e)
            except requests.exceptions.Timeout as e:
                _logger.error("A timeout occurred when requesting SAML assertion")
                raise InterfaceError(e)
            except requests.exceptions.TooManyRedirects as e:
                _logger.error(
                    "A error occurred when requesting SAML assertion to refresh credentials. "
                    "Verify RedshiftProperties are correct"
                )
                raise InterfaceError(e)
            except requests.exceptions.RequestException as e:
                _logger.error("A unknown error occurred when requesting SAML assertion to refresh credentials")
                raise InterfaceError(e)

            _logger.debug(response.content)

            try:
                soup = bs4.BeautifulSoup(response.text)
            except Exception as e:
                _logger.error("An error occurred while parsing response: {}".format(str(e)))
                raise InterfaceError(e)

            payload: typing.Dict[str, typing.Optional[str]] = {}
            username: bool = False
            pwd: bool = False

            for inputtag in soup.find_all(re.compile("(INPUT|input)")):
                name: str = inputtag.get("name", "")
                id: str = inputtag.get("id", "")
                value: str = inputtag.get("value", "")
                _logger.debug("Name={} , id={}".format(name, id))

                if username is False and self.is_text(inputtag) and id == "username":
                    payload[name] = self.user_name
                    username = True
                elif self.is_password(inputtag) and ("pass" in name):
                    if pwd is True:
                        raise InterfaceError("Duplicate password fields on login page.")
                    payload[name] = self.password
                    pwd = True
                elif name != "":
                    payload[name] = value

            if username is False:
                for inputtag in soup.find_all(re.compile("(INPUT|input)")):
                    name = inputtag.get("name", "")
                    if self.is_text(inputtag) and ("user" in name or "email" in name):
                        payload[name] = self.user_name
                        username = True

            if (username is False) or (pwd is False):
                _logger.debug(soup.find_all(re.compile("(INPUT|input)")))
                raise InterfaceError("Failed to parse login form.")

            action: typing.Optional[str] = self.get_form_action(soup)
            if action and action.startswith("/"):
                url = "https://{host}:{port}{action}".format(host=self.idp_host, port=str(self.idpPort), action=action)
            _logger.debug("Action uri: {}".format(url))

            try:
                response = session.post(url, data=payload, verify=self.do_verify_ssl_cert())
                response.raise_for_status()
            except requests.exceptions.HTTPError as e:
                _logger.error("Request to refresh credentials was unsuccessful. {}".format(str(e)))
                raise InterfaceError(e)
            except requests.exceptions.Timeout as e:
                _logger.error("A timeout occurred when attempting to refresh credentials")
                raise InterfaceError(e)
            except requests.exceptions.TooManyRedirects as e:
                _logger.error("A error occurred when refreshing credentials. Verify RedshiftProperties are correct")
                raise InterfaceError(e)
            except requests.exceptions.RequestException as e:
                _logger.error("A unknown error occurred when refreshing credentials")
                raise InterfaceError(e)

            try:
                soup = bs4.BeautifulSoup(response.text)
            except Exception as e:
                _logger.error("An error occurred while parsing SAML response: {}".format(str(e)))
                raise InterfaceError(e)

            assertion: str = ""
            for inputtag in soup.find_all("input"):
                if inputtag.get("name") == "SAMLResponse":
                    assertion = inputtag.get("value")

            if assertion == "":
                raise InterfaceError("Failed to retrieve SAMLAssertion.")

            return assertion