def batch_create_users()

in source/idea/idea-cluster-manager/src/ideaclustermanager/cli/accounts.py [0:0]


def batch_create_users(path_to_csv: str, force: bool, generate_template: bool, template_path: str):
    """
    creates users from csv file
    """

    context = ClusterManagerUtils.get_soca_cli_context_cluster_manager()

    if generate_template:
        # generate template
        if template_path and Utils.is_dir(template_path):
            csv_file = ClusterManagerUtils.generate_template(template_path)
            context.info(f'The generated template file is located at {csv_file}')
        else:
            context.error('Path missing or incorrect; please provide a path to where to generate the "users_file_template.csv"')
        return

    ClusterManagerUtils.check_if_csv_file(path_to_csv, context)

    user_table = ClusterManagerUtils.build_confirm_user_table()
    status_table = ClusterManagerUtils.build_create_user_status_table()

    new_users = []
    new_usernames = []
    csv_validations = []
    csv_contains_error = False
    csv_contains_duplicate = False
    with open(path_to_csv, 'r', encoding="utf-8") as csv_file:
        csv_reader = csv.DictReader(csv_file)
        fieldnames = csv_reader.fieldnames

        # check for valid csv file
        ClusterManagerUtils.check_has_valid_csv_headers(fieldnames, context)

        for row in csv_reader:
            new_username = AuthUtils.sanitize_username(Utils.get_value_as_string('Username', row))
            valid_username_response = ''
            try:
                ClusterManagerUtils.check_valid_username(new_username)
            except exceptions.SocaException as e:
                if e.error_code == errorcodes.INVALID_PARAMS:
                    valid_username_response = e.message
                else:
                    raise e
            if not Utils.is_empty(valid_username_response):
                csv_contains_error = True
            new_email = Utils.get_value_as_string('Email', row).strip().lower()

            valid_email_response = ''
            try:
                ClusterManagerUtils.check_valid_email(new_email)
            except exceptions.SocaException as e:
                if e.error_code == errorcodes.INVALID_PARAMS:
                    valid_email_response = e.message
                else:
                    raise e
            if not Utils.is_empty(valid_email_response):
                csv_contains_error = True
            new_is_admin = Utils.get_value_as_bool('Is Admin?', row, default=False)

            new_usernames.append(new_username)
            csv_validations.append((new_username, new_email, Utils.get_as_string(new_is_admin, default='False'), valid_username_response, valid_email_response))

            user_table.add_row(new_username, new_email, Utils.get_as_string(new_is_admin))
            new_users.append(User(username=new_username, email=new_email, sudo=new_is_admin))

    duplicate_users_in_csv = ClusterManagerUtils.check_duplicate_users_in_csv(new_usernames)
    if len(duplicate_users_in_csv) > 0:
        table = Table()
        table.add_column('Username', justify='left', no_wrap=False)
        for user in duplicate_users_in_csv:
            table.add_row(user)
        context.print(table)
        context.error('These users are repeated in the csv file; please ensure all usernames are unique in the csv file and try again.')
        return

    duplicate_users = ClusterManagerUtils.check_duplicate_users(new_usernames, context)
    if len(duplicate_users) > 0:
        csv_contains_duplicate = True

    if csv_contains_error or csv_contains_duplicate:
        for item in csv_validations:
            is_duplicate = item[0] in duplicate_users

            if not Utils.is_empty(item[3]) and not Utils.is_empty(item[4]):
                status_table.add_row(item[0], item[1], item[2], "FAILED", f'{item[3]} AND {item[4]}', style='red')
            elif not Utils.is_empty(item[3]):
                status_table.add_row(item[0], item[1], item[2], "FAILED", f'{item[3]}', style='red')
            elif not Utils.is_empty(item[4]) and is_duplicate:
                status_table.add_row(item[0], item[1], item[2], "FAILED", f'user already exists; please remove them from the csv and try again AND {item[4]}', style='red')
            elif not Utils.is_empty(item[4]):
                status_table.add_row(item[0], item[1], item[2], "FAILED", f'{item[4]}', style='red')
            elif is_duplicate:
                status_table.add_row(item[0], item[1], item[2], "FAILED", 'user already exists; please remove them from the csv and try again', style='red')
            else:
                status_table.add_row(item[0], item[1], item[2], "ABORTED", 'csv file contains errors', style='bright_black')
        context.print(status_table)
        raise SystemExit(1)

    if not force:
        context.print(user_table)
        continue_deployment = context.prompt(f'Are you sure you want to add these users?')
        if not continue_deployment:
            for new_user in new_users:
                status_table.add_row(new_user.username, new_user.email, Utils.get_as_string(new_user.sudo, default='False'), 'ABORTED', 'Adding user Aborted!')
            context.print(status_table)
            raise SystemExit(1)

    create_users_response = ClusterManagerUtils.create_users(new_users, context)
    ClusterManagerUtils.print_create_user_status(create_users_response, context)