def main()

in Cognito/identity-pool-integrator-onelogin/identitypool_integrator_onelogin.py [0:0]


def main():
    # Get and parse the values passed in arguments to use in the integration tester.
    parser = argparse.ArgumentParser(description='Cognito Identity Pool and OneLogin SAML Authentication Integrator')

    parser.add_argument('-d','--debugflag', help='Enter Y to enable debugging. This will print the credentials into STDOUT', type=str)
    parser.add_argument('-e','--emailorusername', required=True, help='Enter email to login/username to your User Pool', type=str)
    parser.add_argument('-p','--password',required=True, help='Enter password to login to your User Pool', type=str)
    parser.add_argument('-a','--appid', required=True ,help='Enter your Cognito App Id', type=str)
    parser.add_argument('-s','--subdomain', required=True, help='The name of the subdomain that got created when you created the OneLogin account', type=str)
    parser.add_argument('-i','--identityprovidername', required=True, help='This tag to be filled below is the IAM SAML Identity Provider name of the Identity Provider we have created for OneLogin', type=str)
    parser.add_argument('-c','--accountid', required=True, help='Account ID of your AWS Account', type=str)
    parser.add_argument('-t','--identitypoolid', required=True, help='ID of your Cognito Identity Pool', type=str)

    args = vars(parser.parse_args())

    debugflag = args.get("debugflag")

    if debugflag is None:
        debugflag == "N"

    emailorusername = args["emailorusername"]
    password = args["password"]
    appid = args["appid"]
    subdomain = args["subdomain"]
    identityprovidername = args["identityprovidername"]
    accountid = args["accountid"]
    identitypoolid = args["identitypoolid"]

    try:
        # Get the OneLogin App Client ID and App Client Secret. This is stored in AWS systems manager(ssm).
        ssm_client = boto3.client("ssm")

        app_credentials = ssm_client.get_parameter(Name="OneLoginAppCredentials")
        appclientidonelogin, appclientsecretonelogin = app_credentials["Parameter"]["Value"].split(",")

        #First Call to get the access token Reference - https://developers.onelogin.com/api-docs/1/oauth20-tokens/generate-tokens-2
        get_oauth_tokens = requests.post('https://api.us.onelogin.com/auth/oauth2/v2/token',
          auth=(appclientidonelogin,appclientsecretonelogin),
          json={
            "grant_type": "client_credentials"
          }
        )

        get_oauth_tokens = get_oauth_tokens.json()

        if debugflag == "Y":
            print("The OneLogin Access Token is ",get_oauth_tokens['access_token'])
        else:
            print("The OneLogin Access Token has been obtained successfully")

        print("-----------------------------------------------")

        access_token = get_oauth_tokens['access_token']

        #Second Call to get the SAML response token - https://developers.onelogin.com/api-docs/1/saml-assertions/generate-saml-assertion

        payload = {
          "username_or_email": emailorusername,
          "password": password,
          "app_id": appid,
          "subdomain":subdomain #The name of the subdomain that got created when you created the OneLogin account.
        }

        headers = {'Authorization': 'bearer:'+access_token,'Content-Type' : 'application/json'}

        get_saml_assertion = requests.post(url = 'https://api.us.onelogin.com/api/1/saml_assertion',headers = headers,data=json.dumps(payload))

        saml_assertion = get_saml_assertion.json()["data"]

        if debugflag == "Y":
            print("The SAML Assertion from OneLogin is ", saml_assertion)
        else:
            print("The SAML assertion has been obtained successfully")

        print("-----------------------------------------------")

        account_id,identity_pool_id = accountid,identitypoolid

        identity = boto3.client('cognito-identity')

        #Third Call to get the identity-id using the cognito get-id call

        get_identity_id = identity.get_id(AccountId=account_id, IdentityPoolId=identity_pool_id,Logins={identityprovidername:saml_assertion})

        identity_id = get_identity_id['IdentityId']

        #Fourth Call to get the AWS temporary credentials

        get_temporary_aws_credentials = identity.get_credentials_for_identity(IdentityId=identity_id,Logins={identityprovidername:saml_assertion})

        if debugflag == "Y":
            print("The temporary AWS credentials are ", get_temporary_aws_credentials)
        else:
            print("The temporary AWS credentials have been obtained successfully")

        print("-----------------------------------------------")

    except Exception as e:
        print("Error occured")
        traceback.print_exc(file=sys.stdout)