def okta_authentication()

in redshift_connector/plugin/okta_credentials_provider.py [0:0]


    def okta_authentication(self: "OktaCredentialsProvider") -> str:
        import requests

        # HTTP Post request to Okta API for session token
        url: str = "https://{host}/api/v1/authn".format(host=self.idp_host)
        _logger.debug("Okta authentication request uri: {}".format(url))
        headers: typing.Dict[str, str] = okta_headers
        payload: typing.Dict[str, typing.Optional[str]] = {"username": self.user_name, "password": self.password}
        _logger.debug("Okta authentication payload contains username={}".format(self.user_name))

        try:
            response: "requests.Response" = requests.post(
                url, data=json.dumps(payload), headers=headers, verify=self.do_verify_ssl_cert()
            )
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
            if "response" in vars():
                _logger.debug("Okta authentication response body: {}".format(response.content))  # type: ignore
            else:
                _logger.debug("Okta authentication response raised an exception. No response returned.")
            _logger.error("Request for authentication from Okta was unsuccessful. {}".format(str(e)))
            raise InterfaceError(e)
        except requests.exceptions.Timeout as e:
            _logger.error("A timeout occurred when requesting authentication from Okta")
            raise InterfaceError(e)
        except requests.exceptions.TooManyRedirects as e:
            _logger.error(
                "A error occurred when requesting authentication from Okta. Verify RedshiftProperties are correct"
            )
            raise InterfaceError(e)
        except requests.exceptions.RequestException as e:
            _logger.error("A unknown error occurred when requesting authentication from Okta")
            raise InterfaceError(e)

        # Retrieve and parse the Okta response for session token
        if response is None:
            raise InterfaceError("Request for authentication returned empty payload")
        _logger.debug("Okta_authentication https response: {!r}".format(response.content))
        response_payload: typing.Dict[str, typing.Any] = response.json()

        if "status" not in response_payload:
            _logger.debug("Status key not found in payload")
            raise InterfaceError("Request for authentication retrieved malformed payload.")
        elif response_payload["status"] != "SUCCESS":
            _logger.debug("Status={} found in payload. Status must equal SUCCESS".format(response_payload["status"]))
            raise InterfaceError("Request for authentication received non success response.")
        else:
            return str(response_payload["sessionToken"])