def link_existing_users()

in source/idea/idea-cluster-manager/src/ideaclustermanager/app/accounts/helpers/single_sign_on_helper.py [0:0]


    def link_existing_users(self):
        """
        if there are existing users in the user pool, added prior to enabling SSO, link all the existing users with IDP.
        this ensures SSO works for existing users that were created before enabling IDP.

        system administration users such as clusteradmin are not linked with IDP with an assumption that no such users will exist
        in corporate directory service.

        when SSO is enabled, accessing the cluster endpoint will result in signing-in automatically.
        to ensure clusteradmin user can sign-in, a special query parameter can be provided:
        `sso=False`
        sending this parameter as part of query string will not trigger the sso flow from the web portal.
        """
        user_pool_id = self.context.config().get_string('identity-provider.cognito.user_pool_id', required=True)
        provider_name = self.context.config().get_string('identity-provider.cognito.sso_idp_provider_name', required=True)
        provider_type = self.context.config().get_string('identity-provider.cognito.sso_idp_provider_type', required=True)
        cluster_admin_username = self.context.config().get_string('cluster.administrator_username', required=True)

        if provider_type == constants.SSO_IDP_PROVIDER_OIDC:
            provider_email_attribute = 'email'
        else:
            provider_email_attribute = self.context.config().get_string('identity-provider.cognito.sso_idp_provider_email_attribute', required=True)

        while True:
            list_users_result = self.context.aws().cognito_idp().list_users(
                UserPoolId=user_pool_id
            )
            users = list_users_result['Users']
            for user in users:
                try:
                    if user['UserStatus'] == 'EXTERNAL_PROVIDER':
                        continue

                    username = user['Username']

                    # exclude system administration users
                    if username in cluster_admin_username or username.startswith('clusteradmin'):
                        self.logger.info(f'system administration user found: {username}. skip linking with IDP.')
                        continue

                    email = None
                    already_linked = False
                    user_attributes = Utils.get_value_as_list('Attributes', user, [])
                    for user_attribute in user_attributes:
                        name = user_attribute['Name']
                        if name == 'email':
                            email = Utils.get_value_as_string('Value', user_attribute)
                        elif name == 'identities':
                            identities = Utils.get_value_as_list('Value', user_attribute, [])
                            for identity in identities:
                                if identity['providerName'] == provider_name:
                                    already_linked = True
                    if Utils.is_empty(email):
                        continue
                    if already_linked:
                        self.logger.info(f'user: {username}, email: {email} already linked. skip.')
                        continue

                    self.logger.info(f'linking user: {username}, email: {email} ...')

                    def admin_link_provider_for_user(**kwargs):
                        self.logger.info(f'link request: {Utils.to_json(kwargs)}')
                        self.context.aws().cognito_idp().admin_link_provider_for_user(**kwargs)

                    admin_link_provider_for_user(
                        UserPoolId=user_pool_id,
                        DestinationUser={
                            'ProviderName': 'Cognito',
                            'ProviderAttributeName': 'cognito:username',
                            'ProviderAttributeValue': user['Username']
                        },
                        SourceUser={
                            'ProviderName': provider_name,
                            'ProviderAttributeName': provider_email_attribute,
                            'ProviderAttributeValue': email
                        }
                    )
                    # sleep for a while to avoid flooding aws with these requests.
                    time.sleep(0.2)
                except Exception as e:
                    self.logger.error(f'failed to link user: {user} with IDP: {provider_name} - {e}')

            pagination_token = Utils.get_value_as_string('PaginationToken', list_users_result)
            if Utils.is_empty(pagination_token):
                break