in dms_cdk/dms_cdk_stack.py [0:0]
def __init__(self, scope: core.Construct, construct_id: str, dms_target_s3_access_role, stage_bucket,
sns_topic,vpc_default_security_group, vpc_subnet_group, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
stack = core.Stack.of(self)
params = {}
# Determine account type based on env variable for fetching common parameters
# suitable instance class, max filesize that DMS split data into files and replicate to S3 stage bucket
current_dir = os.path.dirname(__file__)
param_path = '../resources/config/parameters.txt'
# Open a reader to the csv, the delimiter is a single space
with open(os.path.join(current_dir, param_path), mode='r') as infile:
reader = csv.reader(infile, delimiter=',', skipinitialspace=True)
next(reader)
params = {key: row for key, *row in reader}
dms_instance_class = params['instancesize'][0]
target_max_file_size = params['maxfilesize'][0]
task_migration_type = params['task_migration_type'][0]
# boto3 client for Secrets Manager
sm_client = boto3.client("secretsmanager")
param_path = '../resources/config/dms_config_details.txt'
with open(os.path.join(current_dir, param_path), mode='r') as csvfile:
# Open a reader to the csv, the delimiter is a single space
reader = csv.DictReader(csvfile)
prevInstName = ''
vpc_subnet_group_ids = []
replication_subnet_group_identifier = 'aws-cdk-cdk-subnetgroup'
for i in vpc_subnet_group:
vpc_subnet_group_ids.append(i.subnet_id)
subnet = dms.CfnReplicationSubnetGroup(
self,
"DMSReplicationSubnetGrp",
replication_subnet_group_identifier=replication_subnet_group_identifier,
replication_subnet_group_description='DMS replication subnet group',
subnet_ids=vpc_subnet_group_ids
)
for row in reader:
replInstName = "{}-instance".format(row["server"])
replInstIdentifier= "{}-instance".format(row["server"])
replTaskName = "{}-{}-all".format(row["server"],row["dbname"])
sourceEndPoint = "{}-{}-sqlserver-source-endpoint".format(row["server"],row["dbname"])
targetEndPoint = "{}-{}-s3-target-endpoint".format(row["server"],row["dbname"])
SecretId = "dms_{}_{}_sql_server".format(row["server"],row["dbname"])
s3_prefix = "data/{}/{}/".format(row["server"],row["dbname"])
vpc_security_group_id = vpc_default_security_group
#Fetch credentials for each database from AWS Secret manager
get_secret_value_response = sm_client.get_secret_value(SecretId=SecretId)
secret = get_secret_value_response['SecretString']
secret = json.loads(secret)
#Create replication instance per server
if replInstName != prevInstName :
instance = dms.CfnReplicationInstance(
self,
replInstName,
replication_instance_identifier=replInstIdentifier,
replication_instance_class=dms_instance_class,
publicly_accessible=False,
replication_subnet_group_identifier=subnet.ref,
vpc_security_group_ids=[vpc_security_group_id],
auto_minor_version_upgrade=True,
multi_az=True,
engine_version='3.4.3'
)
prevInstName = replInstName
# create source endpoint
source = dms.CfnEndpoint(
self,
sourceEndPoint,
endpoint_identifier=sourceEndPoint,
endpoint_type='source',
engine_name=secret.get('engine'),
server_name=secret.get('host'),
port=int(secret.get('port')),
database_name=secret.get('dbname'),
username=secret.get('username'),
password=secret.get('password'),
ssl_mode="require"
)
# create target endpoint
target = dms.CfnEndpoint(
self,
targetEndPoint,
endpoint_identifier=targetEndPoint,
endpoint_type='target',
engine_name='s3',
s3_settings=dms.CfnEndpoint.S3SettingsProperty(
bucket_name=stage_bucket.bucket_name,
bucket_folder=s3_prefix,
compression_type="GZIP",
service_access_role_arn=dms_target_s3_access_role.role_arn
),
extra_connection_attributes=f"encryptionMode=SSE_S3;timestampColumnName=TX_TIMESTAMP;dataFormat=parquet;parquetVersion=PARQUET_2_0;parquetTimestampInMillisecond=true;maxFileSize={target_max_file_size};",
)
mappings_location = '../resources/config/dms_json_mappings/dms_{}_{}_mappings.json'.format(row["server"],row["dbname"])
with open(os.path.join(current_dir, mappings_location.lower()), mode='r') as jsonfile:
mappings_json = json.load(jsonfile)
# create dms replication task
task = dms.CfnReplicationTask(
self,
replTaskName,
replication_task_identifier=replTaskName,
replication_instance_arn=instance.ref,
migration_type=task_migration_type,
source_endpoint_arn=source.ref,
target_endpoint_arn=target.ref,
table_mappings=json.dumps(mappings_json)
)
# event notifcation if instance fails or task fails
failure_event_instance = dms.CfnEventSubscription(
self,
"failure_event_instance",
sns_topic_arn=sns_topic.topic_arn,
enabled=True,
event_categories=["failure"],
source_type="replication-instance"
)
failure_event_task = dms.CfnEventSubscription(
self,
"failure_event_task",
sns_topic_arn=sns_topic.topic_arn,
enabled=True,
event_categories=["failure"],
source_type="replication-task"
)