in granular_access/lambda_functions/granular_access_assets_govenance/granular_access_assets_govenance.py [0:0]
def lambda_handler(event, context):
# get account_id
sts_client = boto3.client("sts", region_name=aws_region)
account_id = sts_client.get_caller_identity()["Account"]
qs_client = boto3.client('quicksight', region_name='us-east-1')
qs_local_client = boto3.client('quicksight', region_name=lambda_aws_region)
s3 = boto3.resource('s3')
bucketname = get_s3_info()
bucket = s3.Bucket(bucketname)
key = 'monitoring/quicksight/errors/assets_governance_error_log.csv'
tmpdir = tempfile.mkdtemp()
local_file_name = 'assets_governance_error_log.csv'
path = os.path.join(tmpdir, local_file_name)
# update access permissions
dashboards = list_dashboards(account_id, lambda_aws_region)
analyses = list_analyses(account_id, lambda_aws_region)
datasets = list_datasets(account_id, lambda_aws_region)
datasources = list_data_sources(account_id, lambda_aws_region)
themes = list_themes(account_id, lambda_aws_region)
permissions = get_ssm_parameters('/qs/config/access')
print(permissions)
permissions = permissions['Permissions']
reportlist = []
errorlists = []
for permission in permissions:
arn = 'arn:aws:quicksight:' + aws_region + ':' + account_id + ":group/" + permission['ns_name'] +\
"/quicksight-fed-" + permission['Group_Name'].lower()
reportnamels = permission['Reports']
print(reportnamels)
if len(reportnamels) > 0:
if reportnamels[0] == 'all':
for dashboard in dashboards:
dashboardid = dashboard['DashboardId']
try:
response = qs_local_client.update_dashboard_permissions(
AwsAccountId=account_id,
DashboardId=dashboardid,
GrantPermissions=[
{
'Principal': arn,
'Actions': ['quicksight:DescribeDashboard',
'quicksight:ListDashboardVersions',
'quicksight:UpdateDashboardPermissions',
'quicksight:QueryDashboard',
'quicksight:UpdateDashboard',
'quicksight:DeleteDashboard',
'quicksight:DescribeDashboardPermissions',
'quicksight:UpdateDashboardPublishedVersion']
},
]
)
except Exception as e:
print(e)
for dataset in datasets:
if dataset['Name'] not in ['Business Review', 'People Overview',
'Sales Pipeline',
'Web and Social Media Analytics']:
datasetid = dataset['DataSetId']
try:
response = qs_local_client.update_data_set_permissions(
AwsAccountId=account_id,
DataSetId=datasetid,
GrantPermissions=[
{
'Principal': arn,
'Actions': ['quicksight:UpdateDataSetPermissions',
'quicksight:DescribeDataSet',
'quicksight:DescribeDataSetPermissions',
'quicksight:PassDataSet',
'quicksight:DescribeIngestion',
'quicksight:ListIngestions',
'quicksight:UpdateDataSet',
'quicksight:DeleteDataSet',
'quicksight:CreateIngestion',
'quicksight:CancelIngestion']
},
]
)
except Exception as e:
if str(e).find('FILE'):
pass
else:
print(e)
for datasource in datasources:
datasourceid = datasource['DataSourceId']
try:
response = qs_local_client.update_data_source_permissions(
AwsAccountId=account_id,
DataSourceId=datasourceid,
GrantPermissions=[
{
'Principal': arn,
'Actions': ["quicksight:DescribeDataSource",
"quicksight:DescribeDataSourcePermissions",
"quicksight:PassDataSource",
"quicksight:UpdateDataSource",
"quicksight:DeleteDataSource",
"quicksight:UpdateDataSourcePermissions"]
},
]
)
except Exception as e:
if str(e).find('FILE'):
pass
else:
print(e)
for analysis in analyses:
if analysis['Status'] != 'DELETED':
analysisid = analysis['AnalysisId']
try:
response = qs_local_client.update_analysis_permissions(
AwsAccountId=account_id,
AnalysisId=analysisid,
GrantPermissions=[
{
'Principal': arn,
'Actions': ['quicksight:RestoreAnalysis',
'quicksight:UpdateAnalysisPermissions',
'quicksight:DeleteAnalysis',
'quicksight:QueryAnalysis',
'quicksight:DescribeAnalysisPermissions',
'quicksight:DescribeAnalysis',
'quicksight:UpdateAnalysis']
},
]
)
except Exception as e:
print(e)
for theme in themes:
if theme['ThemeId'] not in ['SEASIDE', 'CLASSIC', 'MIDNIGHT']:
themeid = theme['ThemeId']
try:
response = qs_local_client.update_theme_permissions(
AwsAccountId=account_id,
ThemeId=themeid,
GrantPermissions=[
{
'Principal': arn,
'Actions': ["quicksight:DescribeTheme",
"quicksight:DescribeThemeAlias",
"quicksight:ListThemeAliases",
"quicksight:ListThemeVersions",
"quicksight:DeleteTheme",
"quicksight:UpdateTheme",
"quicksight:CreateThemeAlias",
"quicksight:DeleteThemeAlias",
"quicksight:UpdateThemeAlias",
"quicksight:UpdateThemePermissions",
"quicksight:DescribeThemePermissions"
]
},
]
)
except Exception as e:
print(e)
elif reportnamels[0] == 'read-all':
for dashboard in dashboards:
dashboardid = dashboard['DashboardId']
try:
response = qs_local_client.update_dashboard_permissions(
AwsAccountId=account_id,
DashboardId=dashboardid,
GrantPermissions=[
{
'Principal': arn,
'Actions': ['quicksight:DescribeDashboard',
'quicksight:ListDashboardVersions',
'quicksight:QueryDashboard']
},
]
)
except Exception as e:
print(e)
for dataset in datasets:
if dataset['Name'] not in ['Business Review', 'People Overview',
'Sales Pipeline',
'Web and Social Media Analytics', 'rls', 'user_attributes'
'groups_users', 'data_access',
'object_access']:
datasetid = dataset['DataSetId']
try:
response = qs_local_client.update_data_set_permissions(
AwsAccountId=account_id,
DataSetId=datasetid,
GrantPermissions=[
{
'Principal': arn,
'Actions': ['quicksight:DescribeDataSet',
'quicksight:DescribeDataSetPermissions',
'quicksight:PassDataSet',
'quicksight:DescribeIngestion',
'quicksight:ListIngestions',
'quicksight:CreateIngestion',
'quicksight:CancelIngestion']
},
]
)
except Exception as e:
if str(e).find('FILE'):
pass
else:
print(e)
for datasource in datasources:
datasourceid = datasource['DataSourceId']
try:
response = qs_local_client.update_data_source_permissions(
AwsAccountId=account_id,
DataSourceId=datasourceid,
GrantPermissions=[
{
'Principal': arn,
'Actions': ["quicksight:DescribeDataSource",
"quicksight:DescribeDataSourcePermissions",
"quicksight:PassDataSource"]
},
]
)
except Exception as e:
if str(e).find('FILE'):
pass
else:
print(e)
else:
for reportname in reportnamels:
ids = get_dashboard_ids(reportname, account_id, lambda_aws_region)
if len(ids) == 1:
reportlist.append(ids[0])
try:
response = qs_local_client.update_dashboard_permissions(
AwsAccountId=account_id,
DashboardId=ids[0],
GrantPermissions=[
{
'Principal': arn,
'Actions': ['quicksight:DescribeDashboard',
'quicksight:ListDashboardVersions',
'quicksight:QueryDashboard']
},
]
)
except Exception as e:
print(e)
elif len(ids) > 1:
for id in ids:
errorlists.append(['Duplicate Dashboard Name', reportname, id])
elif len(ids) == 0:
errorlists.append(['Dashboard not existed', reportname, 'None'])
# revoke dashboard access of a group
group = "quicksight-fed-" + permission['Group_Name'].lower()
print(group + 'can view ')
print(reportnamels)
print('and ids are: ')
print(reportlist)
# get dashboards list
#dashboards = list_dashboards(account_id, lambda_aws_region)
for dashboard in dashboards:
dashboardid = dashboard['DashboardId']
if dashboardid not in reportlist:
# print(dashboardid)
if group not in ['quicksight-fed-bi-developer', 'quicksight-fed-bi-admin',
'quicksight-fed-power-reader']:
print("revoke " + group + "dashboard id: " + dashboardid + "(name: " + dashboard['Name'] + ")")
try:
response = qs_local_client.update_dashboard_permissions(
AwsAccountId=account_id,
DashboardId=dashboardid,
RevokePermissions=[
{
'Principal': arn,
'Actions': ['quicksight:DescribeDashboard',
'quicksight:ListDashboardVersions',
'quicksight:QueryDashboard']
},
]
)
except Exception as e:
if str(e).find('Invalid principals given'):
pass
else:
raise e
with open(path, 'w', newline='') as outfile:
writer = csv.writer(outfile)
for line in errorlists:
writer.writerow(line)
bucket.upload_file(path, key)