in source/idea/idea-cluster-manager/src/ideaclustermanager/cli/accounts.py [0:0]
def batch_create_users(path_to_csv: str, force: bool, generate_template: bool, template_path: str):
"""
creates users from csv file
"""
context = ClusterManagerUtils.get_soca_cli_context_cluster_manager()
if generate_template:
# generate template
if template_path and Utils.is_dir(template_path):
csv_file = ClusterManagerUtils.generate_template(template_path)
context.info(f'The generated template file is located at {csv_file}')
else:
context.error('Path missing or incorrect; please provide a path to where to generate the "users_file_template.csv"')
return
ClusterManagerUtils.check_if_csv_file(path_to_csv, context)
user_table = ClusterManagerUtils.build_confirm_user_table()
status_table = ClusterManagerUtils.build_create_user_status_table()
new_users = []
new_usernames = []
csv_validations = []
csv_contains_error = False
csv_contains_duplicate = False
with open(path_to_csv, 'r', encoding="utf-8") as csv_file:
csv_reader = csv.DictReader(csv_file)
fieldnames = csv_reader.fieldnames
# check for valid csv file
ClusterManagerUtils.check_has_valid_csv_headers(fieldnames, context)
for row in csv_reader:
new_username = AuthUtils.sanitize_username(Utils.get_value_as_string('Username', row))
valid_username_response = ''
try:
ClusterManagerUtils.check_valid_username(new_username)
except exceptions.SocaException as e:
if e.error_code == errorcodes.INVALID_PARAMS:
valid_username_response = e.message
else:
raise e
if not Utils.is_empty(valid_username_response):
csv_contains_error = True
new_email = Utils.get_value_as_string('Email', row).strip().lower()
valid_email_response = ''
try:
ClusterManagerUtils.check_valid_email(new_email)
except exceptions.SocaException as e:
if e.error_code == errorcodes.INVALID_PARAMS:
valid_email_response = e.message
else:
raise e
if not Utils.is_empty(valid_email_response):
csv_contains_error = True
new_is_admin = Utils.get_value_as_bool('Is Admin?', row, default=False)
new_usernames.append(new_username)
csv_validations.append((new_username, new_email, Utils.get_as_string(new_is_admin, default='False'), valid_username_response, valid_email_response))
user_table.add_row(new_username, new_email, Utils.get_as_string(new_is_admin))
new_users.append(User(username=new_username, email=new_email, sudo=new_is_admin))
duplicate_users_in_csv = ClusterManagerUtils.check_duplicate_users_in_csv(new_usernames)
if len(duplicate_users_in_csv) > 0:
table = Table()
table.add_column('Username', justify='left', no_wrap=False)
for user in duplicate_users_in_csv:
table.add_row(user)
context.print(table)
context.error('These users are repeated in the csv file; please ensure all usernames are unique in the csv file and try again.')
return
duplicate_users = ClusterManagerUtils.check_duplicate_users(new_usernames, context)
if len(duplicate_users) > 0:
csv_contains_duplicate = True
if csv_contains_error or csv_contains_duplicate:
for item in csv_validations:
is_duplicate = item[0] in duplicate_users
if not Utils.is_empty(item[3]) and not Utils.is_empty(item[4]):
status_table.add_row(item[0], item[1], item[2], "FAILED", f'{item[3]} AND {item[4]}', style='red')
elif not Utils.is_empty(item[3]):
status_table.add_row(item[0], item[1], item[2], "FAILED", f'{item[3]}', style='red')
elif not Utils.is_empty(item[4]) and is_duplicate:
status_table.add_row(item[0], item[1], item[2], "FAILED", f'user already exists; please remove them from the csv and try again AND {item[4]}', style='red')
elif not Utils.is_empty(item[4]):
status_table.add_row(item[0], item[1], item[2], "FAILED", f'{item[4]}', style='red')
elif is_duplicate:
status_table.add_row(item[0], item[1], item[2], "FAILED", 'user already exists; please remove them from the csv and try again', style='red')
else:
status_table.add_row(item[0], item[1], item[2], "ABORTED", 'csv file contains errors', style='bright_black')
context.print(status_table)
raise SystemExit(1)
if not force:
context.print(user_table)
continue_deployment = context.prompt(f'Are you sure you want to add these users?')
if not continue_deployment:
for new_user in new_users:
status_table.add_row(new_user.username, new_user.email, Utils.get_as_string(new_user.sudo, default='False'), 'ABORTED', 'Adding user Aborted!')
context.print(status_table)
raise SystemExit(1)
create_users_response = ClusterManagerUtils.create_users(new_users, context)
ClusterManagerUtils.print_create_user_status(create_users_response, context)