in scripts/testConnection.py [0:0]
def check_connection(endpoint,rep):
dms = boto3.client('dms')
dms.test_connection(ReplicationInstanceArn=rep,EndpointArn=endpoint)
waiter = dms.get_waiter('test_connection_succeeds')
waiter.wait(
Filters=[
{
'Name': 'endpoint-arn',
'Values': [endpoint]
},
{
'Name': 'replication-instance-arn',
'Values':[rep]
}
]
)
status_conn_api = dms.describe_connections(
Filters=[
{
'Name': 'endpoint-arn',
'Values': [endpoint]
},
{
'Name': 'replication-instance-arn',
'Values': [rep]
}
]
)
stat_task = status_conn_api['Connections'][0]['Status']
print('The connection test was %s' %(stat_task))
return (stat_task)