in cid/common.py [0:0]
def deploy(self, **kwargs):
""" Deploy Dashboard """
selection = list()
for k, dashboard in self.resources.get('dashboards').items():
selection.append(
questionary.Choice(
title=f"{dashboard.get('name')}",
value=k
)
)
try:
selected_dashboard = questionary.select(
"Please select dashboard to install",
choices=selection
).ask()
except:
print('\nEnd: No updates available or dashboard(s) is/are broken\n')
return
# Get selected dashboard definition
dashboard_definition = self.resources.get(
'dashboards').get(selected_dashboard)
required_datasets = dashboard_definition.get(
'dependsOn', dict()).get('datasets', list())
self.create_datasets(required_datasets)
# Prepare API parameters
if not dashboard_definition.get('datasets'):
dashboard_definition.update({'datasets': {}})
dashboard_datasets = dashboard_definition.get('datasets')
for dataset_name in required_datasets:
arn = next((v.get('Arn') for v in self.qs._datasets.values() if v.get('Name') == dataset_name), None)
if arn:
dashboard_datasets.update({dataset_name: arn})
kwargs = dict()
local_overrides = f'work/{self.awsIdentity.get("Account")}/{dashboard_definition.get("dashboardId")}.json'
print(
f'Looking for local overrides file "{local_overrides}"...', end='')
try:
with open(local_overrides, 'r', encoding='utf-8') as r:
try:
print('found')
if click.confirm(f'Use local overrides from {local_overrides}?'):
kwargs = json.load(r)
print('loaded')
except Exception as e:
# Catch exception and dump a reason
click.echo('failed to load, dumping error message')
print(json.dumps(e, indent=4, sort_keys=True, default=str))
except FileNotFoundError:
print('not found')
# Get QuickSight template details
latest_template = self.qs.describe_template(template_id=dashboard_definition.get(
'templateId'), account_id=dashboard_definition.get('sourceAccountId'))
dashboard_definition.update({'sourceTemplate': latest_template})
# Create dashboard
click.echo(
f"Latest template: {latest_template.get('Arn')}/version/{latest_template.get('Version').get('VersionNumber')}")
click.echo('\nDeploying...', nl=False)
_url = self.qs_url.format(
dashboard_id=dashboard_definition.get('dashboardId'), **self.qs_url_params
)
try:
self.qs.create_dashboard(dashboard_definition, **kwargs)
click.echo('completed')
click.echo(
f"#######\n####### Congratulations!\n####### {dashboard_definition.get('name')} is available at: {_url}\n#######")
except self.qs.client.exceptions.ResourceExistsException:
click.echo('error, already exists')
click.echo(
f"#######\n####### {dashboard_definition.get('name')} is available at: {_url}\n#######")
except Exception as e:
# Catch exception and dump a reason
click.echo('failed, dumping error message')
print(json.dumps(e, indent=4, sort_keys=True, default=str))
exit(1)
return dashboard.get('dashboardId')