in tts/integration_test/tts_integration.py [0:0]
def test_simple_ssml_via_polly_node(self):
node = rclpy.create_node('integtest')
client = node.create_client(Polly, 'polly')
retries = 0
while not client.wait_for_service(timeout_sec=2.0):
retries += 1
self.failIf(retries > 3, 'service is not available')
text = '<speak>Mary has a little lamb, little lamb, little lamb.</speak>'
request = Polly.Request(polly_action='SynthesizeSpeech', text=text, text_type='ssml')
future = client.call_async(request)
while rclpy.ok():
rclpy.spin_once(node)
if future.done():
self.failIf(future.result() is None, 'nothing is returned')
break
print('Waiting for service to be done.')
res = future.result()
self.assertIsNotNone(res)
self.assertTrue(type(res) is Polly.Response)
r = json.loads(res.result)
self.assertIn('Audio Type', r, 'result should contain audio type')
self.assertIn('Audio File', r, 'result should contain file path')
self.assertIn('Amazon Polly Response Metadata', r, 'result should contain metadata')
audio_type = r['Audio Type']
audio_file = r['Audio File']
md = r['Amazon Polly Response Metadata']
self.assertTrue("'HTTPStatusCode': 200," in md)
self.assertEqual('audio/pcm', audio_type)
self.assertTrue(audio_file.endswith('.wav'))
import subprocess
o = subprocess.check_output(['file', audio_file], stderr=subprocess.STDOUT)
import re
m = re.search(r'.*WAVE audio.*', o.decode('utf-8'), flags=re.MULTILINE)
self.assertIsNotNone(m)
node.destroy_node()