in redshift_connector/plugin/azure_credentials_provider.py [0:0]
def azure_oauth_based_authentication(self: "AzureCredentialsProvider") -> str:
import requests
# endpoint to connect with Microsoft Azure to get SAML Assertion token
url: str = "https://login.microsoftonline.com/{tenant}/oauth2/token".format(tenant=self.idp_tenant)
_logger.debug("Uri: {}".format(url))
# headers to pass with POST request
headers: typing.Dict[str, str] = azure_headers
# required parameters to pass in POST body
payload: typing.Dict[str, typing.Optional[str]] = {
"grant_type": "password",
"requested_token_type": "urn:ietf:params:oauth:token-type:saml2",
"username": self.user_name,
"password": self.password,
"client_secret": self.client_secret,
"client_id": self.client_id,
"resource": self.client_id,
}
try:
response: "requests.Response" = requests.post(
url, data=payload, headers=headers, verify=self.do_verify_ssl_cert()
)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if "response" in vars():
_logger.debug(
"azure_oauth_based_authentication https response: {}".format(response.content) # type: ignore
)
else:
_logger.debug("Azure_oauth_based_authentication could not receive https response due to an error")
_logger.error("Request for authentication from Azure was unsuccessful. {}".format(str(e)))
raise InterfaceError(e)
except requests.exceptions.Timeout as e:
_logger.error("A timeout occurred when requesting authentication from Azure")
raise InterfaceError(e)
except requests.exceptions.TooManyRedirects as e:
_logger.error(
"A error occurred when requesting authentication from Azure. Verify RedshiftProperties are correct"
)
raise InterfaceError(e)
except requests.exceptions.RequestException as e:
_logger.error("A unknown error occurred when requesting authentication from Azure.")
raise InterfaceError(e)
_logger.debug(response.text)
# parse the JSON response to grab access_token field which contains Base64 encoded SAML
# Assertion and decode it
saml_assertion: str = ""
try:
saml_assertion = response.json()["access_token"]
except Exception as e:
_logger.error("Failed to authenticate with Azure. Response from Azure did not include access_token.")
raise InterfaceError(e)
if saml_assertion == "":
raise InterfaceError("Azure access_token is empty")
missing_padding: int = 4 - len(saml_assertion) % 4
if missing_padding:
saml_assertion += "=" * missing_padding
# decode the SAML Assertion to a String to add XML tags to form a SAML Response
decoded_saml_assertion: str = ""
try:
decoded_saml_assertion = str(base64.urlsafe_b64decode(saml_assertion))
except TypeError as e:
_logger.error("Failed to decode saml assertion returned from Azure")
raise InterfaceError(e)
# SAML Response is required to be sent to base class. We need to provide a minimum of:
# 1) samlp:Response XML tag with xmlns:samlp protocol value
# 2) samlp:Status XML tag and samlpStatusCode XML tag with Value indicating Success
# 3) followed by Signed SAML Assertion
saml_response: str = (
'<samlp:Response xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol">'
"<samlp:Status>"
'<samlp:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success"/>'
"</samlp:Status>"
"{decoded_saml_assertion}"
"</samlp:Response>".format(decoded_saml_assertion=decoded_saml_assertion[2:-1])
)
# re-encode the SAML Response in Base64 and return this to the base class
saml_response = str(base64.b64encode(saml_response.encode("utf-8")))[2:-1]
return saml_response