qpid_tests/client/client-api-example-tests.py (223 lines of code) (raw):

#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # """ client-api-examples-interop.py """ """ ** TODO Add XML Exchange tests """ from __future__ import absolute_import import os import shlex import subprocess import unittest import uuid import re from time import sleep import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s', filename='./client-api-example-tests.log', filemode='w') ####################################################################################### # # !!! Configure your paths here !!! # ####################################################################################### ## If you set qpid_root on a source tree, from the default install for ## this script, you're good to go. If running from elsewhere against a ## source tree, set QPID_ROOT. If running from an installed system, ## set QPID_CPP_EXAMPLES, QPID_PYTHON_EXAMPLES, QPID_PYTHON_TOOLS, ## etc. to the directories below. qpid_root = os.getenv("QPID_ROOT", os.path.abspath("../../../../../../qpid")) logging.debug("Qpid Root: " + qpid_root) qpid_broker = os.getenv("QPID_BROKER", "localhost:5672") logging.debug("Qpid Broker: " + qpid_broker) ######################################################################################## # # If you are working from a source tree, setting the above paths is # sufficient. # # If your examples are installed somewhere else, you have to tell us # where examples in each language are kept # ######################################################################################## cpp_examples_path = os.getenv("QPID_CPP_EXAMPLES", qpid_root + "/cpp/examples/messaging/") python_examples_path = os.getenv("QPID_PYTHON_EXAMPLES", qpid_root + "/python/examples/api/") python_path = os.getenv("PYTHONPATH", qpid_root+"/python:" + qpid_root+"/extras/qmf/src/py") os.environ["PYTHONPATH"] = python_path logging.debug("PYTHONPATH: " + os.environ["PYTHONPATH"]) python_tools_path = os.getenv("QPID_PYTHON_TOOLS", qpid_root + "/tools/src/py/") logging.debug("QPID_PYTHON_TOOLS: " + python_tools_path) java_qpid_home = os.getenv("QPID_HOME", qpid_root + "/java/build/lib/") os.environ["QPID_HOME"] = java_qpid_home logging.debug("Java's QPID_HOME: " + os.environ["QPID_HOME"]) java_examples_path = os.getenv("QPID_JAVA_EXAMPLES", qpid_root + "/java/client/example/") find = "find " + java_qpid_home + " -name '*.jar'" args = shlex.split(find) popen = subprocess.Popen(args, stdout=subprocess.PIPE) out, err = popen.communicate() os.environ["CLASSPATH"] = java_examples_path + ":" + re.sub("\\n", ":", out) logging.debug("Java CLASSPATH = " + os.environ["CLASSPATH"]) java_invoke = "java " + "-Dlog4j.configuration=log4j.conf " ############################################################################################ drains = [ {'lang': 'CPP', 'command': cpp_examples_path + "drain" }, {'lang': 'PYTHON', 'command': python_examples_path + "drain"}, {'lang': 'JAVA', 'command': java_invoke + "org.apache.qpid.example.Drain"} ] spouts = [ {'lang': 'CPP', 'command': cpp_examples_path + "spout" }, {'lang': 'PYTHON', 'command': python_examples_path + "spout"}, {'lang': 'JAVA', 'command': java_invoke + "org.apache.qpid.example.Spout"} ] mapSenders = [ {'lang': 'CPP', 'command': cpp_examples_path + "map_sender" }, {'lang': 'JAVA', 'command': java_invoke + "org.apache.qpid.example.MapSender"} ] mapReceivers = [ {'lang': 'CPP', 'command': cpp_examples_path + "map_receiver" }, {'lang': 'JAVA', 'command': java_invoke + "org.apache.qpid.example.MapReceiver"} ] hellos = [ {'lang': 'CPP', 'command': cpp_examples_path + "hello_world" }, {'lang': 'PYTHON', 'command': python_examples_path + "hello" }, {'lang': 'JAVA', 'command': java_invoke + "org.apache.qpid.example.Hello"} ] wockyClients = [ {'lang': 'CPP', 'command': cpp_examples_path + "client" }, ] wockyServers = [ {'lang': 'CPP', 'command': cpp_examples_path + "server" }, ] shortWait = 0.5 longWait = 3 # use sparingly! class TestDrainSpout(unittest.TestCase): # setUp / tearDown def setUp(self): logging.debug('----------------------------') logging.debug('START: ' + self.tcaseName()) def tearDown(self): pass ############################################################################# # # Lemmas # ############################################################################# def tcaseName(self): return re.split('[.]', self.id())[-1] # Python utilities def qpid_config(self, args): commandS = python_tools_path + "qpid-config" + ' ' + args args = shlex.split(commandS) logging.debug("qpid_config(): " + commandS) popen = subprocess.Popen(args, stdout=subprocess.PIPE) out, err = popen.communicate() logging.debug("qpid-config() - out=" + str(out) + ", err=" + str(err)) # Send / receive methods in various languages def send(self, spout=spouts[0], content="", destination="amq.topic", create=1, wait=0): if wait: sleep(wait) createS = ";{create:always}" if create else "" addressS = "'" + destination + createS + "'" brokerS = "-b " + qpid_broker if spout['lang']=='CPP': contentS = " ".join(['--content',"'"+content+"'"]) if content else "" commandS = " ".join([spout['command'], brokerS, contentS, addressS]) elif spout['lang']=='PYTHON': commandS = " ".join([spout['command'], brokerS, addressS, content]) elif spout['lang']=='JAVA': brokerS = "-b guest:guest@" + qpid_broker commandS = " ".join([spout['command'], brokerS, "--content="+"'"+content+"'", addressS]) else: raise "Ain't no such language ...." logging.debug("send(): " + commandS) args = shlex.split(commandS) popen = subprocess.Popen(args, stdout=subprocess.PIPE) out, err = popen.communicate() logging.debug("send() - out=" + str(out) + ", err=" + str(err)) def receive(self, drain=drains[0], destination="amq.topic", delete=1): deleteS = ";{delete:always}" if delete else "" addressS = "'" + destination + deleteS + "'" brokerS = "-b " + qpid_broker optionS = "-c 1 -t 30" if drain['lang']=='CPP': commandS = " ".join([drain['command'], optionS, brokerS, optionS, addressS]) elif drain['lang']=='PYTHON': commandS = " ".join([drain['command'], brokerS, optionS, addressS]) elif drain['lang']=='JAVA': brokerS = "-b guest:guest@" + qpid_broker commandS = " ".join([drain['command'], brokerS, optionS, addressS]) else: raise "Ain't no such language ...." logging.debug("receive() " + commandS) args = shlex.split(commandS) popen = subprocess.Popen(args, stdout=subprocess.PIPE) out, err = popen.communicate() logging.debug("receive() - out=" + str(out) + ", err=" + str(err)) return out def subscribe(self, drain=drains[0], destination="amq.topic", create=0): optionS = "-t 30 -c 1" brokerS = "-b " + qpid_broker if drain['lang']=='CPP': commandS = " ".join([drain['command'], brokerS, optionS, destination]) elif drain['lang']=='PYTHON': commandS = " ".join([drain['command'], brokerS, optionS, destination]) elif drain['lang']=='JAVA': logging.debug("Java working directory: ") brokerS = "-b guest:guest@" + qpid_broker commandS = " ".join([drain['command'], brokerS, optionS, destination]) else: logging.debug("subscribe() - no such language!") raise "Ain't no such language ...." logging.debug("subscribe() " + commandS) args = shlex.split(commandS) return subprocess.Popen(args, stdout=subprocess.PIPE) def listen(self, popen): out,err = popen.communicate() logging.debug("listen(): out=" + str(out) + ", err=" + str(err)) return out ############################################################################# # # Tests # ############################################################################# # Hello world! def test_hello_world(self): for hello_world in hellos: args = shlex.split(hello_world['command']) popen = subprocess.Popen(args, stdout=subprocess.PIPE) out = popen.communicate()[0] logging.debug(out) self.assertTrue(out.find("world!") > 0) def test_jabberwocky(self): for i, s in enumerate(wockyServers): for j, c in enumerate(wockyClients): args = shlex.split(s['command']) server = subprocess.Popen(args, stdout=subprocess.PIPE) args = shlex.split(c['command']) client = subprocess.Popen(args, stdout=subprocess.PIPE) out = client.communicate()[0] logging.debug(out) self.assertTrue(out.find("BRILLIG") >= 0) server.terminate() def test_maps(self): for s in mapSenders: for r in mapReceivers: args = shlex.split(s['command']) sender = subprocess.Popen(args, stdout=subprocess.PIPE) args = shlex.split(r['command']) receiver = subprocess.Popen(args, stdout=subprocess.PIPE) out = receiver.communicate()[0] logging.debug(out) sender.terminate() def test_queues(self): for i, s in enumerate(spouts): for j, d in enumerate(drains): content = self.tcaseName() + ": " + s['lang'] + str(i) + " => " + d['lang'] + str(j) self.send(s, content=content, destination="hello_world", create=1) out = self.receive(d, destination="hello_world", delete=1) self.assertTrue(out.find(content) >= 0) def test_direct_exchange(self): for i, s in enumerate(spouts): for j, d in enumerate(drains): content = self.tcaseName() + ": " + s['lang'] + str(i) + " => " + d['lang'] + str(j) popen1 = self.subscribe(d, destination="amq.direct/subject") popen2 = self.subscribe(d, destination="amq.direct/subject") self.send(s, content=content, destination="amq.direct/subject", create=0, wait=2) out1 = self.listen(popen1) out2 = self.listen(popen2) self.assertTrue(out1.find(self.tcaseName()) >= 0) self.assertTrue(out2.find(self.tcaseName()) >= 0) def test_fanout_exchange(self): for i, s in enumerate(spouts): for j, d in enumerate(drains): content = self.tcaseName() + ": " + s['lang'] + str(i) + " => " + d['lang'] + str(j) popen1 = self.subscribe(d, destination="amq.fanout") popen2 = self.subscribe(d, destination="amq.fanout") self.send(s, content=content, destination="amq.fanout", create=0, wait=2) out1 = self.listen(popen1) out2 = self.listen(popen2) self.assertTrue(out1.find(self.tcaseName()) >= 0) self.assertTrue(out2.find(self.tcaseName()) >= 0) def test_topic_exchange(self): for i, s in enumerate(spouts): for j, d in enumerate(drains): content = self.tcaseName() + ": " + s['lang'] + str(i) + " => " + d['lang'] + str(j) popen1 = self.subscribe(d, destination="amq.topic" + "/" + s['lang'] + "." + d['lang']) popen2 = self.subscribe(d, destination="amq.topic" + "/" + "*" + "." + d['lang']) popen3 = self.subscribe(d, destination="amq.topic" + "/" + s['lang'] + "." + "*") popen4 = self.subscribe(d, destination="amq.topic" + "/" + "#" + "." + d['lang']) self.send(s, content=content, destination="amq.topic"+ "/" + s['lang'] + "." + d['lang'], create=0, wait=4) out1 = self.listen(popen1) out2 = self.listen(popen2) out3 = self.listen(popen3) out4 = self.listen(popen4) logging.debug("out1:"+out1) logging.debug("out2:"+out2) logging.debug("out3:"+out3) logging.debug("out4:"+out4) self.assertTrue(out1.find(self.tcaseName()) >= 0) self.assertTrue(out2.find(self.tcaseName()) >= 0) self.assertTrue(out3.find(self.tcaseName()) >= 0) self.assertTrue(out4.find(self.tcaseName()) >= 0) if __name__ == '__main__': unittest.main()