in source/lambda/quicksight-custom-resources/util/datasource.py [0:0]
def create(self):
logger.info(f"creating quicksight datasource id:{self.id}")
quicksight_client = get_quicksight_client()
data_source_parameters = {"AthenaParameters": {"WorkGroup": self.athena_workgroup}}
try:
response = quicksight_client.create_data_source(
AwsAccountId=self.aws_account_id,
DataSourceId=self.id,
Name=self.name,
Type="ATHENA",
DataSourceParameters=data_source_parameters,
Permissions=self._get_permissions(),
SslProperties={"DisableSsl": False},
)
logger.info(f"finished creating quicksight datasource for id:{self.id}" f"response {response}")
except quicksight_client.exceptions.ResourceExistsException:
logger.info(f"datasource for id:{self.id} already exists")
response = quicksight_client.describe_data_source(AwsAccountId=self.aws_account_id, DataSourceId=self.id)
response = response["DataSource"]
self.arn = response["Arn"]
return response