in Assets_as_Code/Assets_as_Code/src/functions.py [0:0]
def create_data_source(source, session, target):
qs = session.client('quicksight')
sts_client = session.client("sts")
account_id = sts_client.get_caller_identity()["Account"]
credential = None
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/quicksight.html#QuickSight.Client.create_data_source
conn_dict = {
"aurora": "AuroraParameters",
"aurora_postgresql": "AuroraPostgreSqlParameters",
"mariadb": "MariaDbParameters",
"mysql": "MySqlParameters",
"postgresql": "PostgreSqlParameters",
"sqlserver": "SqlServerParameters"
}
# rds
if source['Type'].lower() in [
'aurora', 'aurora_postgresql', 'mariadb', 'mysql', 'postgresql',
'sqlserver'] and 'RdsParameters' in source['DataSourceParameters']:
# Update data source instance name
instance_id = source['DataSourceParameters']['RdsParameters']
instance_id['InstanceId'] = target['rds']['rdsinstanceid']
credential = target['credential']['rdscredential']
elif source['Type'].lower() in [
'aurora', 'aurora_postgresql', 'mariadb', 'mysql', 'postgresql',
'sqlserver'] and conn_dict.get(source['Type'].lower()) in source['DataSourceParameters']:
# Update data source parameters
conn_name = conn_dict.get(source['Type'].lower())
conn_params = source['DataSourceParameters'][conn_name]
conn_params['Host'] = target['rds']['rdsinstanceid']
credential = target['credential']['rdscredential']
# redshift
if source['Type'] == "REDSHIFT":
# Update data source instance name
Cluster = source['DataSourceParameters']['RedshiftParameters']
if 'ClusterId' in Cluster:
Cluster['ClusterId'] = target['redshift']['ClusterId']
Cluster['Host'] = target['redshift']['Host']
if target['redshift']['Database'] is not None and 'Database' in Cluster:
Cluster['Database'] = target['redshift']['Database']
credential = target['credential']['redshiftcredential']
if 'VpcConnectionProperties' in source and target['vpc'] is not None:
source['VpcConnectionProperties']['VpcConnectionArn'] = target['vpc']
elif 'VpcConnectionProperties' in source and target['vpc'] is None:
raise Exception("Sorry, you need the targetvpc information")
args: Dict[str, Any] = {
"AwsAccountId": account_id,
"DataSourceId": source['DataSourceId'],
"Name": source['Name'],
"Type": source['Type'],
}
if "SslProperties" in source:
args["SslProperties"] = source['SslProperties']
if 'DataSourceParameters' in source:
args["DataSourceParameters"] = source['DataSourceParameters']
if target['tag'] is not None:
args["Tags"] = target['tag']
if credential is not None:
args["Credentials"] = credential
if 'VpcConnectionProperties' in source:
args["VpcConnectionProperties"] = source['VpcConnectionProperties']
args["Permissions"] = target['datasourcepermission']
try:
NewSource = qs.create_data_source(**args)
return NewSource
except Exception as e:
error = {"DataSource": args, "Error": str(e)}
return error