in source/idea/idea-cluster-manager/src/ideaclustermanager/app/accounts/helpers/single_sign_on_helper.py [0:0]
def link_existing_users(self):
"""
if there are existing users in the user pool, added prior to enabling SSO, link all the existing users with IDP.
this ensures SSO works for existing users that were created before enabling IDP.
system administration users such as clusteradmin are not linked with IDP with an assumption that no such users will exist
in corporate directory service.
when SSO is enabled, accessing the cluster endpoint will result in signing-in automatically.
to ensure clusteradmin user can sign-in, a special query parameter can be provided:
`sso=False`
sending this parameter as part of query string will not trigger the sso flow from the web portal.
"""
user_pool_id = self.context.config().get_string('identity-provider.cognito.user_pool_id', required=True)
provider_name = self.context.config().get_string('identity-provider.cognito.sso_idp_provider_name', required=True)
provider_type = self.context.config().get_string('identity-provider.cognito.sso_idp_provider_type', required=True)
cluster_admin_username = self.context.config().get_string('cluster.administrator_username', required=True)
if provider_type == constants.SSO_IDP_PROVIDER_OIDC:
provider_email_attribute = 'email'
else:
provider_email_attribute = self.context.config().get_string('identity-provider.cognito.sso_idp_provider_email_attribute', required=True)
while True:
list_users_result = self.context.aws().cognito_idp().list_users(
UserPoolId=user_pool_id
)
users = list_users_result['Users']
for user in users:
try:
if user['UserStatus'] == 'EXTERNAL_PROVIDER':
continue
username = user['Username']
# exclude system administration users
if username in cluster_admin_username or username.startswith('clusteradmin'):
self.logger.info(f'system administration user found: {username}. skip linking with IDP.')
continue
email = None
already_linked = False
user_attributes = Utils.get_value_as_list('Attributes', user, [])
for user_attribute in user_attributes:
name = user_attribute['Name']
if name == 'email':
email = Utils.get_value_as_string('Value', user_attribute)
elif name == 'identities':
identities = Utils.get_value_as_list('Value', user_attribute, [])
for identity in identities:
if identity['providerName'] == provider_name:
already_linked = True
if Utils.is_empty(email):
continue
if already_linked:
self.logger.info(f'user: {username}, email: {email} already linked. skip.')
continue
self.logger.info(f'linking user: {username}, email: {email} ...')
def admin_link_provider_for_user(**kwargs):
self.logger.info(f'link request: {Utils.to_json(kwargs)}')
self.context.aws().cognito_idp().admin_link_provider_for_user(**kwargs)
admin_link_provider_for_user(
UserPoolId=user_pool_id,
DestinationUser={
'ProviderName': 'Cognito',
'ProviderAttributeName': 'cognito:username',
'ProviderAttributeValue': user['Username']
},
SourceUser={
'ProviderName': provider_name,
'ProviderAttributeName': provider_email_attribute,
'ProviderAttributeValue': email
}
)
# sleep for a while to avoid flooding aws with these requests.
time.sleep(0.2)
except Exception as e:
self.logger.error(f'failed to link user: {user} with IDP: {provider_name} - {e}')
pagination_token = Utils.get_value_as_string('PaginationToken', list_users_result)
if Utils.is_empty(pagination_token):
break