in redshift_connector/plugin/adfs_credentials_provider.py [0:0]
def form_based_authentication(self: "AdfsCredentialsProvider") -> str:
import bs4 # type: ignore
import requests
url: str = "https://{host}:{port}/adfs/ls/IdpInitiatedSignOn.aspx?loginToRp=urn:amazon:webservices".format(
host=self.idp_host, port=str(self.idpPort)
)
_logger.debug("Uri: {}".format(url))
try:
response: "requests.Response" = requests.get(url, verify=self.do_verify_ssl_cert())
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if "response" in vars():
_logger.debug("Form_based_authentication https response: {}".format(response.content)) # type: ignore
else:
_logger.debug("Form_based_authentication could not receive https response due to an error")
_logger.error("Request for SAML assertion when refreshing credentials was unsuccessful. {}".format(str(e)))
raise InterfaceError(e)
except requests.exceptions.Timeout as e:
_logger.error("A timeout occurred when requesting SAML assertion")
raise InterfaceError(e)
except requests.exceptions.TooManyRedirects as e:
_logger.error(
"A error occurred when requesting SAML assertion to refresh credentials. "
"Verify RedshiftProperties are correct"
)
raise InterfaceError(e)
except requests.exceptions.RequestException as e:
_logger.error("A unknown error occurred when requesting SAML assertion to refresh credentials")
raise InterfaceError(e)
_logger.debug(response.text)
try:
soup = bs4.BeautifulSoup(response.text, features="lxml")
except Exception as e:
_logger.error("An error occurred while parsing response: {}".format(str(e)))
raise InterfaceError(e)
payload: typing.Dict[str, typing.Optional[str]] = {}
for inputtag in soup.find_all(re.compile("(INPUT|input)")):
name: str = inputtag.get("name", "")
value: str = inputtag.get("value", "")
_logger.debug("Name={}".format(name))
if "username" in name.lower():
payload[name] = self.user_name
elif "authmethod" in name.lower():
payload[name] = value
elif "password" in name.lower():
payload[name] = self.password
elif name != "":
payload[name] = value
action: typing.Optional[str] = self.get_form_action(soup)
if action and action.startswith("/"):
url = "https://{host}:{port}{action}".format(host=self.idp_host, port=str(self.idpPort), action=action)
_logger.debug("Action uri: {}".format(url))
try:
response = requests.post(url, data=payload, verify=self.do_verify_ssl_cert())
response.raise_for_status()
except requests.exceptions.HTTPError as e:
_logger.error("Request to refresh credentials was unsuccessful. {}".format(str(e)))
raise InterfaceError(e)
except requests.exceptions.Timeout as e:
_logger.error("A timeout occurred when attempting to refresh credentials")
raise InterfaceError(e)
except requests.exceptions.TooManyRedirects as e:
_logger.error("A error occurred when refreshing credentials. Verify RedshiftProperties are correct")
raise InterfaceError(e)
except requests.exceptions.RequestException as e:
_logger.error("A unknown error occurred when refreshing credentials")
raise InterfaceError(e)
try:
soup = bs4.BeautifulSoup(response.text, features="lxml")
except Exception as e:
_logger.error("An error occurred while parsing response: {}".format(str(e)))
raise InterfaceError(e)
assertion: str = ""
for inputtag in soup.find_all("input"):
if inputtag.get("name") == "SAMLResponse":
assertion = inputtag.get("value")
if assertion == "":
raise InterfaceError("Failed to find Adfs access_token")
return assertion