in cid/helpers/quicksight.py [0:0]
def create_dashboard(self, definition: dict, **kwargs) -> Dashboard:
""" Creates an AWS QuickSight dashboard """
DataSetReferences = list()
for k, v in definition.get('datasets', dict()).items():
DataSetReferences.append({
'DataSetPlaceholder': k,
'DataSetArn': v
})
create_parameters = {
'AwsAccountId': self.account_id,
'DashboardId': definition.get('dashboardId'),
'Name': definition.get('name'),
'Permissions': [
{
"Principal": self.user.get('Arn'),
"Actions": [
"quicksight:DescribeDashboard",
"quicksight:ListDashboardVersions",
"quicksight:UpdateDashboardPermissions",
"quicksight:QueryDashboard",
"quicksight:UpdateDashboard",
"quicksight:DeleteDashboard",
"quicksight:DescribeDashboardPermissions",
"quicksight:UpdateDashboardPublishedVersion"
]
}
],
'SourceEntity': {
'SourceTemplate': {
'Arn': f"{definition.get('sourceTemplate').get('Arn')}/version/{definition.get('sourceTemplate').get('Version').get('VersionNumber')}",
'DataSetReferences': DataSetReferences
}
}
}
create_parameters = always_merger.merge(create_parameters, kwargs)
try:
create_status = self.client.create_dashboard(**create_parameters)
logger.debug(create_status)
except self.client.exceptions.ResourceExistsException:
raise
created_version = int(create_status['VersionArn'].split('/')[-1])
current_status = create_status['CreationStatus']
# Poll for the current status of query as long as its not finished
describe_parameters = {
'DashboardId': definition.get('dashboardId'),
'VersionNumber': created_version
}
dashboard = self.describe_dashboard(poll=True, **describe_parameters)
if not dashboard.health:
failure_reason = dashboard.version.get('Errors')
raise Exception(failure_reason)
return dashboard