def test_simple_ssml_via_synthesizer_node()

in tts/integration_test/tts_integration.py [0:0]


    def test_simple_ssml_via_synthesizer_node(self):
        node = rclpy.create_node('integtest')
        client = node.create_client(Synthesizer, 'synthesizer')

        retries = 0
        while not client.wait_for_service(timeout_sec=2.0):
            retries += 1
            self.failIf(retries > 3, 'service is not available')

        text = '<speak>Mary has a little lamb, little lamb, little lamb.</speak>'
        request = Synthesizer.Request(text=text, metadata='''{"text_type":"ssml"}''')
        future = client.call_async(request)

        while rclpy.ok():
            rclpy.spin_once(node)
            if future.done():
                self.failIf(future.result() is None, 'nothing is returned')
                break
            print('Waiting for service to be done.')

        res = future.result()

        self.assertIsNotNone(res)
        self.assertTrue(type(res) is Synthesizer.Response)

        r = json.loads(res.result)
        self.assertIn('Audio Type', r, 'result should contain audio type')
        self.assertIn('Audio File', r, 'result should contain file path')
        self.assertIn('Amazon Polly Response Metadata', r, 'result should contain metadata')

        audio_type = r['Audio Type']
        audio_file = r['Audio File']
        md = r['Amazon Polly Response Metadata']
        self.assertTrue("'HTTPStatusCode': 200," in md)
        self.assertEqual('audio/pcm', audio_type)
        self.assertTrue(audio_file.endswith('.wav'))

        import subprocess
        o = subprocess.check_output(['file', audio_file], stderr=subprocess.STDOUT)
        import re
        m = re.search(r'.*WAVE audio.*', o.decode('utf-8'), flags=re.MULTILINE)
        self.assertIsNotNone(m)

        node.destroy_node()