in redshift_connector/plugin/ping_credentials_provider.py [0:0]
def get_saml_assertion(self: "PingCredentialsProvider") -> str:
import bs4 # type: ignore
import requests
self.check_required_parameters()
with requests.Session() as session:
if self.partner_sp_id is None:
self.partner_sp_id = "urn%3Aamazon%3Awebservices"
url: str = "https://{host}:{port}/idp/startSSO.ping?PartnerSpId={sp_id}".format(
host=self.idp_host, port=str(self.idpPort), sp_id=self.partner_sp_id
)
_logger.debug("Uri: {}".format(url))
try:
response: "requests.Response" = session.get(url, verify=self.do_verify_ssl_cert())
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if "response" in vars():
_logger.debug("Get_saml_assertion https response: {}".format(response.content)) # type: ignore
else:
_logger.debug("Get_saml_assertion could not receive https response due to an error")
_logger.error(
"Request for SAML assertion when refreshing credentials was unsuccessful. {}".format(str(e))
)
raise InterfaceError(e)
except requests.exceptions.Timeout as e:
_logger.error("A timeout occurred when requesting SAML assertion")
raise InterfaceError(e)
except requests.exceptions.TooManyRedirects as e:
_logger.error(
"A error occurred when requesting SAML assertion to refresh credentials. "
"Verify RedshiftProperties are correct"
)
raise InterfaceError(e)
except requests.exceptions.RequestException as e:
_logger.error("A unknown error occurred when requesting SAML assertion to refresh credentials")
raise InterfaceError(e)
_logger.debug(response.content)
try:
soup = bs4.BeautifulSoup(response.text)
except Exception as e:
_logger.error("An error occurred while parsing response: {}".format(str(e)))
raise InterfaceError(e)
payload: typing.Dict[str, typing.Optional[str]] = {}
username: bool = False
pwd: bool = False
for inputtag in soup.find_all(re.compile("(INPUT|input)")):
name: str = inputtag.get("name", "")
id: str = inputtag.get("id", "")
value: str = inputtag.get("value", "")
_logger.debug("Name={} , id={}".format(name, id))
if username is False and self.is_text(inputtag) and id == "username":
payload[name] = self.user_name
username = True
elif self.is_password(inputtag) and ("pass" in name):
if pwd is True:
raise InterfaceError("Duplicate password fields on login page.")
payload[name] = self.password
pwd = True
elif name != "":
payload[name] = value
if username is False:
for inputtag in soup.find_all(re.compile("(INPUT|input)")):
name = inputtag.get("name", "")
if self.is_text(inputtag) and ("user" in name or "email" in name):
payload[name] = self.user_name
username = True
if (username is False) or (pwd is False):
_logger.debug(soup.find_all(re.compile("(INPUT|input)")))
raise InterfaceError("Failed to parse login form.")
action: typing.Optional[str] = self.get_form_action(soup)
if action and action.startswith("/"):
url = "https://{host}:{port}{action}".format(host=self.idp_host, port=str(self.idpPort), action=action)
_logger.debug("Action uri: {}".format(url))
try:
response = session.post(url, data=payload, verify=self.do_verify_ssl_cert())
response.raise_for_status()
except requests.exceptions.HTTPError as e:
_logger.error("Request to refresh credentials was unsuccessful. {}".format(str(e)))
raise InterfaceError(e)
except requests.exceptions.Timeout as e:
_logger.error("A timeout occurred when attempting to refresh credentials")
raise InterfaceError(e)
except requests.exceptions.TooManyRedirects as e:
_logger.error("A error occurred when refreshing credentials. Verify RedshiftProperties are correct")
raise InterfaceError(e)
except requests.exceptions.RequestException as e:
_logger.error("A unknown error occurred when refreshing credentials")
raise InterfaceError(e)
try:
soup = bs4.BeautifulSoup(response.text)
except Exception as e:
_logger.error("An error occurred while parsing SAML response: {}".format(str(e)))
raise InterfaceError(e)
assertion: str = ""
for inputtag in soup.find_all("input"):
if inputtag.get("name") == "SAMLResponse":
assertion = inputtag.get("value")
if assertion == "":
raise InterfaceError("Failed to retrieve SAMLAssertion.")
return assertion