in resources/adhoc_scripts/Invoke_dms_tasks.py [0:0]
def main():
#Create boto3 client for AWS DMS service
client = boto3.client('dms')
#Open the file in read only and loop over each record for modifying and starting the dms task
with open('./credentials.csv',mode='r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
#Building the task name using servername and dbname from the file
task_name = '{}-{}-all'.format(row['servername'],row['dbname'])
print('Working on task:{}'.format(task_name))
#Fetching the task parameters using the api call describe_replication tasks
response = client.describe_replication_tasks(
Filters = [
{
'Name' : 'replication-task-id',
'Values' : [ task_name ]
}
],
WithoutSettings=True
)
taskArn = response['ReplicationTasks'][0]['ReplicationTaskArn']
#Modify replication task to enable cloudwatch logs since CDK do not support
print('Modifying the replication task : {} for enabling cloudwatch logs'.format(task_name))
response = client.modify_replication_task(
ReplicationTaskArn= taskArn,
ReplicationTaskIdentifier=task_name,
ReplicationTaskSettings="{\"Logging\": {\"EnableLogging\": true}}"
)
print('{}-Task Status:{}'.format(task_name,response['ReplicationTask']['Status']))
#Poll DMS task every 15 secs to identify if the task state set back to Ready
print('Waiting for the task to be Ready State')
waiter_min = 20 #5 mins - Sleep 15 secs each time
for attempt in range(waiter_min) :
response = client.describe_replication_tasks(
Filters = [
{
'Name' : 'replication-task-id',
'Values' : [ task_name ]
}
],
WithoutSettings=True
)
if response['ReplicationTasks'][0]['Status'] == "ready" :
print('Task status is Ready and resuming/start the task')
break
else :
time.sleep(15)
print(attempt)
#Calling boto3 api start_replicaiton_task for starting the DMS task
print('Starting replication_task:{}'.format(task_name))
response = client.start_replication_task(
ReplicationTaskArn=taskArn,
StartReplicationTaskType= 'start-replication'
)
print('{} task status : {}'.format(task_name,response['ReplicationTask']['Status']))