in granular_access/lambda_functions/downgrade_user/downgrade_user.py [0:0]
def lambda_handler(event, context):
# get account_id
sts_client = boto3.client("sts", region_name=aws_region)
account_id = sts_client.get_caller_identity()["Account"]
qs_client = boto3.client('quicksight', region_name='us-east-1')
qs_local_client = boto3.client('quicksight', region_name=lambda_aws_region)
s3 = boto3.resource('s3')
bucketname = get_s3_info()
bucket = s3.Bucket(bucketname)
key = 'monitoring/quicksight/logs/delete_user_log.csv'
tmpdir = tempfile.mkdtemp()
local_file_name = 'delete_user_log.csv'
path = os.path.join(tmpdir, local_file_name)
delete_user_lists = []
# load qs user role information
roles = get_ssm_parameters('/qs/config/roles')
# dict {groupname:role}
print(roles)
namespaces = list_namespaces(account_id, aws_region)
for ns in namespaces:
ns = ns['Name']
users = list_users(account_id, aws_region, ns)
for user in users:
print(user['UserName'])
email = user['UserName'].split('/')[-1]
role = user['Role']
print(role)
groups = list_user_groups(user['UserName'], account_id, aws_region, ns)
# print(groups)
author = False
admin = False
for group in groups:
if 'quicksight-fed' in group['GroupName']:
nsplusgroup = ns + '_' + group['GroupName'].split('-', 2)[2]
if nsplusgroup in roles:
if roles[nsplusgroup] == 'AUTHOR':
author = True
break
elif roles[nsplusgroup] == 'ADMIN':
admin = True
break
else:
author = True
if not author:
if not admin:
if role != 'READER':
try:
delete_user(user['UserName'], account_id, aws_region, ns)
print(user[
'UserName'] + " is deleted because of permissions downgrade from author/admin to reader!")
delete_user_lists.append(['Deleted', ns, user['UserName'], user['Role']])
except Exception as e:
if str(e).find('does not exist'):
print(e)
pass
else:
raise e
with open(path, 'w', newline='') as outfile:
writer = csv.writer(outfile)
for line in delete_user_lists:
writer.writerow(line)
bucket.upload_file(path, key)