def main()

in resources/adhoc_scripts/Invoke_dms_tasks.py [0:0]


def main():
    #Create boto3 client for AWS DMS service
    client = boto3.client('dms')

    #Open the file in read only and loop over each record for modifying and starting the dms task
    with open('./credentials.csv',mode='r') as csvfile:
        reader = csv.DictReader(csvfile)

        for row in reader:
            #Building the task name using servername and dbname from the file
            task_name = '{}-{}-all'.format(row['servername'],row['dbname'])
            print('Working on task:{}'.format(task_name))

            #Fetching the task parameters using the api call describe_replication tasks
            response = client.describe_replication_tasks(
                    Filters = [
                        {
                            'Name' : 'replication-task-id',
                            'Values' : [ task_name ]
                        }
                    ],
                    WithoutSettings=True
                )
            taskArn = response['ReplicationTasks'][0]['ReplicationTaskArn']

            #Modify replication task to enable cloudwatch logs since CDK do not support
            print('Modifying the replication task : {} for enabling cloudwatch logs'.format(task_name))
            response = client.modify_replication_task(
                    ReplicationTaskArn= taskArn,
                    ReplicationTaskIdentifier=task_name,
                    ReplicationTaskSettings="{\"Logging\": {\"EnableLogging\": true}}"
                )
            print('{}-Task Status:{}'.format(task_name,response['ReplicationTask']['Status']))

            #Poll DMS task every 15 secs to identify if the task state set back to Ready
            print('Waiting for the task to be Ready State')
            waiter_min = 20 #5 mins - Sleep 15 secs each time
            for attempt in range(waiter_min) :
                response = client.describe_replication_tasks(
                        Filters = [
                            {
                                'Name' : 'replication-task-id',
                                'Values' : [ task_name ]
                            }
                        ],
                        WithoutSettings=True
                    )

                if response['ReplicationTasks'][0]['Status'] == "ready" :
                    print('Task status is Ready and resuming/start the task')
                    break
                else :
                    time.sleep(15)
                    print(attempt)

            #Calling boto3 api start_replicaiton_task for starting the DMS task
            print('Starting replication_task:{}'.format(task_name))
            response = client.start_replication_task(
                    ReplicationTaskArn=taskArn,
                    StartReplicationTaskType= 'start-replication'
                )
            print('{} task status : {}'.format(task_name,response['ReplicationTask']['Status']))