in redshift_connector/plugin/okta_credentials_provider.py [0:0]
def okta_authentication(self: "OktaCredentialsProvider") -> str:
import requests
# HTTP Post request to Okta API for session token
url: str = "https://{host}/api/v1/authn".format(host=self.idp_host)
_logger.debug("Okta authentication request uri: {}".format(url))
headers: typing.Dict[str, str] = okta_headers
payload: typing.Dict[str, typing.Optional[str]] = {"username": self.user_name, "password": self.password}
_logger.debug("Okta authentication payload contains username={}".format(self.user_name))
try:
response: "requests.Response" = requests.post(
url, data=json.dumps(payload), headers=headers, verify=self.do_verify_ssl_cert()
)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if "response" in vars():
_logger.debug("Okta authentication response body: {}".format(response.content)) # type: ignore
else:
_logger.debug("Okta authentication response raised an exception. No response returned.")
_logger.error("Request for authentication from Okta was unsuccessful. {}".format(str(e)))
raise InterfaceError(e)
except requests.exceptions.Timeout as e:
_logger.error("A timeout occurred when requesting authentication from Okta")
raise InterfaceError(e)
except requests.exceptions.TooManyRedirects as e:
_logger.error(
"A error occurred when requesting authentication from Okta. Verify RedshiftProperties are correct"
)
raise InterfaceError(e)
except requests.exceptions.RequestException as e:
_logger.error("A unknown error occurred when requesting authentication from Okta")
raise InterfaceError(e)
# Retrieve and parse the Okta response for session token
if response is None:
raise InterfaceError("Request for authentication returned empty payload")
_logger.debug("Okta_authentication https response: {!r}".format(response.content))
response_payload: typing.Dict[str, typing.Any] = response.json()
if "status" not in response_payload:
_logger.debug("Status key not found in payload")
raise InterfaceError("Request for authentication retrieved malformed payload.")
elif response_payload["status"] != "SUCCESS":
_logger.debug("Status={} found in payload. Status must equal SUCCESS".format(response_payload["status"]))
raise InterfaceError("Request for authentication received non success response.")
else:
return str(response_payload["sessionToken"])