in source/idea/idea-cluster-manager/src/ideaclustermanager/app/accounts/helpers/single_sign_on_helper.py [0:0]
def create_or_update_user_pool_client(self, provider_name: str, refresh_token_validity_hours: int = None) -> Dict:
"""
setup the user pool client used for communicating with the IDP.
the ClientId and ClientSecret are saved to cluster configuration and secrets manager.
cluster configuration with clientId and secret arn is updated only once during creation.
method can be invoked multiple times to update the user pool client if it already exists to support updating
the callback urls
"""
user_pool_id = self.context.config().get_string('identity-provider.cognito.user_pool_id', required=True)
sso_client_id = self.context.config().get_string('identity-provider.cognito.sso_client_id')
if refresh_token_validity_hours is None or refresh_token_validity_hours <= 0 or refresh_token_validity_hours > 87600:
refresh_token_validity_hours = 12
callback_urls, logout_urls = self.get_callback_logout_urls()
user_pool_client_request = {
'UserPoolId': user_pool_id,
'ClientName': DEFAULT_USER_POOL_CLIENT_NAME,
'AccessTokenValidity': 1,
'IdTokenValidity': 1,
'RefreshTokenValidity': refresh_token_validity_hours,
'TokenValidityUnits': {
'AccessToken': 'hours',
'IdToken': 'hours',
'RefreshToken': 'hours'
},
'ReadAttributes': [
'address',
'birthdate',
'custom:aws_region',
'custom:cluster_name',
'custom:password_last_set',
'custom:password_max_age',
'email',
'email_verified',
'family_name',
'gender',
'given_name',
'locale',
'middle_name',
'name',
'nickname',
'phone_number',
'phone_number_verified',
'picture',
'preferred_username',
'profile',
'updated_at',
'website',
'zoneinfo'
],
'AllowedOAuthFlows': [
'code'
],
'AllowedOAuthScopes': [
'email',
'openid',
'aws.cognito.signin.user.admin'
],
'CallbackURLs': callback_urls,
'LogoutURLs': logout_urls,
'SupportedIdentityProviders': [provider_name],
'AllowedOAuthFlowsUserPoolClient': True
}
if Utils.is_not_empty(sso_client_id):
user_pool_client_request['ClientId'] = sso_client_id
update_user_pool_client_result = self.context.aws().cognito_idp().update_user_pool_client(**user_pool_client_request)
return update_user_pool_client_result['UserPoolClient']
user_pool_client_request['GenerateSecret'] = True
create_user_pool_client_result = self.context.aws().cognito_idp().create_user_pool_client(**user_pool_client_request)
user_pool_client = create_user_pool_client_result['UserPoolClient']
# get custom kms key id for secrets manager if configured
# and add kms key id to request if available. else boto client throws validation exception for None
kms_key_id = self.config.get_string('cluster.secretsmanager.kms_key_id')
tags = [
{
'Key': constants.IDEA_TAG_ENVIRONMENT_NAME,
'Value': self.cluster_name
},
{
'Key': constants.IDEA_TAG_MODULE_NAME,
'Value': constants.MODULE_CLUSTER_MANAGER
}
]
secret_name = f'{self.cluster_name}-sso-client-secret'
try:
describe_secret_result = self.context.aws().secretsmanager().describe_secret(
SecretId=secret_name
)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
describe_secret_result = None
else:
raise e
if describe_secret_result is None:
create_secret_client_secret_request = {
'Name': f'{self.cluster_name}-sso-client-secret',
'Description': f'Single Sign-On OAuth2 Client Secret for Cluster: {self.cluster_name}',
'Tags': tags,
'SecretString': user_pool_client['ClientSecret']
}
if Utils.is_not_empty(kms_key_id):
create_secret_client_secret_request['KmsKeyId'] = kms_key_id
create_secret_client_secret_result = self.context.aws().secretsmanager().create_secret(**create_secret_client_secret_request)
secret_arn = create_secret_client_secret_result['ARN']
else:
update_secret_client_secret_request = {
'SecretId': describe_secret_result['ARN'],
'SecretString': user_pool_client['ClientSecret']
}
if Utils.is_not_empty(kms_key_id):
update_secret_client_secret_request['KmsKeyId'] = kms_key_id
update_secret_client_secret_result = self.context.aws().secretsmanager().update_secret(**update_secret_client_secret_request)
secret_arn = update_secret_client_secret_result['ARN']
self._update_config_entry('cognito.sso_client_id', user_pool_client['ClientId'])
self._update_config_entry('cognito.sso_client_secret', secret_arn)
return user_pool_client