def form_based_authentication()

in redshift_connector/plugin/adfs_credentials_provider.py [0:0]


    def form_based_authentication(self: "AdfsCredentialsProvider") -> str:
        import bs4  # type: ignore
        import requests

        url: str = "https://{host}:{port}/adfs/ls/IdpInitiatedSignOn.aspx?loginToRp=urn:amazon:webservices".format(
            host=self.idp_host, port=str(self.idpPort)
        )
        _logger.debug("Uri: {}".format(url))

        try:
            response: "requests.Response" = requests.get(url, verify=self.do_verify_ssl_cert())
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
            if "response" in vars():
                _logger.debug("Form_based_authentication https response: {}".format(response.content))  # type: ignore
            else:
                _logger.debug("Form_based_authentication could not receive https response due to an error")
            _logger.error("Request for SAML assertion when refreshing credentials was unsuccessful. {}".format(str(e)))
            raise InterfaceError(e)
        except requests.exceptions.Timeout as e:
            _logger.error("A timeout occurred when requesting SAML assertion")
            raise InterfaceError(e)
        except requests.exceptions.TooManyRedirects as e:
            _logger.error(
                "A error occurred when requesting SAML assertion to refresh credentials. "
                "Verify RedshiftProperties are correct"
            )
            raise InterfaceError(e)
        except requests.exceptions.RequestException as e:
            _logger.error("A unknown error occurred when requesting SAML assertion to refresh credentials")
            raise InterfaceError(e)

        _logger.debug(response.text)

        try:
            soup = bs4.BeautifulSoup(response.text, features="lxml")
        except Exception as e:
            _logger.error("An error occurred while parsing response: {}".format(str(e)))
            raise InterfaceError(e)

        payload: typing.Dict[str, typing.Optional[str]] = {}

        for inputtag in soup.find_all(re.compile("(INPUT|input)")):
            name: str = inputtag.get("name", "")
            value: str = inputtag.get("value", "")

            _logger.debug("Name={}".format(name))

            if "username" in name.lower():
                payload[name] = self.user_name
            elif "authmethod" in name.lower():
                payload[name] = value
            elif "password" in name.lower():
                payload[name] = self.password
            elif name != "":
                payload[name] = value

        action: typing.Optional[str] = self.get_form_action(soup)
        if action and action.startswith("/"):
            url = "https://{host}:{port}{action}".format(host=self.idp_host, port=str(self.idpPort), action=action)

        _logger.debug("Action uri: {}".format(url))

        try:
            response = requests.post(url, data=payload, verify=self.do_verify_ssl_cert())
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
            _logger.error("Request to refresh credentials was unsuccessful. {}".format(str(e)))
            raise InterfaceError(e)
        except requests.exceptions.Timeout as e:
            _logger.error("A timeout occurred when attempting to refresh credentials")
            raise InterfaceError(e)
        except requests.exceptions.TooManyRedirects as e:
            _logger.error("A error occurred when refreshing credentials. Verify RedshiftProperties are correct")
            raise InterfaceError(e)
        except requests.exceptions.RequestException as e:
            _logger.error("A unknown error occurred when refreshing credentials")
            raise InterfaceError(e)

        try:
            soup = bs4.BeautifulSoup(response.text, features="lxml")
        except Exception as e:
            _logger.error("An error occurred while parsing response: {}".format(str(e)))
            raise InterfaceError(e)
        assertion: str = ""

        for inputtag in soup.find_all("input"):
            if inputtag.get("name") == "SAMLResponse":
                assertion = inputtag.get("value")

        if assertion == "":
            raise InterfaceError("Failed to find Adfs access_token")

        return assertion