testing/vcttesting/util.py (111 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. from __future__ import absolute_import, unicode_literals import os import socket import string import subprocess import time HERE = os.path.abspath(os.path.dirname(__file__)) ROOT = os.path.normpath(os.path.join(HERE, "..", "..")) HGCLUSTER_DOCKER_COMPOSE = os.path.join(ROOT, "testing", "hgcluster-docker-compose.yml") def get_available_port(): """Obtain a port number available for binding.""" s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("", 0)) _host, port = s.getsockname() s.close() return port def wait_for_amqp( hostname, port, userid, password, ssl=False, timeout=60, extra_check_fn=None ): # Delay import to facilitate module use in limited virtualenvs. import kombu c = kombu.Connection( hostname=hostname, port=port, userid=userid, password=password, ssl=ssl ) start = time.time() while True: try: c.connection return except Exception: pass if extra_check_fn: extra_check_fn() if time.time() - start > timeout: raise Exception("Timeout reached waiting for AMQP") time.sleep(0.1) def wait_for_ssh(hostname, port, timeout=60, extra_check_fn=None): """Wait for an SSH server to start on the specified host and port.""" # Delay import to facilitate module use in limited virtualenvs. import paramiko class IgnoreHostKeyPolicy(paramiko.MissingHostKeyPolicy): def missing_host_key(self, client, hostname, key): return start = time.time() while True: client = paramiko.SSHClient() client.set_missing_host_key_policy(IgnoreHostKeyPolicy()) try: client.connect( hostname, port=port, timeout=0.1, allow_agent=False, look_for_keys=False ) client.close() return except socket.error: pass except paramiko.SSHException: # This is probably wrong. We should ideally attempt authentication # and wait for an explicit auth failed instead of a generic # error. return if extra_check_fn: extra_check_fn() if time.time() - start > timeout: raise Exception("Timeout reached waiting for SSH") time.sleep(0.1) def wait_for_kafka(hostport, timeout=60): """Wait for Kafka to start responding on the specified host:port string.""" # Delay import to facilitate module use in limited virtualenvs. from kafka import SimpleClient start = time.time() while True: try: SimpleClient(hostport, client_id=b"dummy", timeout=1) return except Exception: pass if time.time() - start > timeout: raise Exception("Timeout reached waiting for Kafka") time.sleep(0.1) def wait_for_kafka_topic(hostport, topic, timeout=60): """Wait for a Kafka topic to become available.""" # Delay import to facilitate module use in limited virtualenvs. from kafka import SimpleClient, TopicPartition start = time.time() client = SimpleClient(hostport, client_id=b"dummy", timeout=1) while not client.has_metadata_for_topic(topic): if time.time() - start > timeout: raise Exception("timeout reached waiting for topic") time.sleep(0.1) client.load_metadata_for_topics() # And wait for all partitions in that topic to have a leader. while True: tps = [TopicPartition(topic, p) for p in client.topic_partitions.get(topic, [])] if tps and all(client.topics_to_brokers.get(tp) for tp in tps): break if time.time() - start > timeout: raise Exception("timeout reached waiting for topic brokers") time.sleep(0.1) client.load_metadata_for_topics() def docker_compose_down_background(project_name, show_output=False): """Run `docker-compose down` for the given project name. Returns the `subprocess.Popen` object for use by the caller. """ docker_compose_down_command = [ "docker-compose", "--file", HGCLUSTER_DOCKER_COMPOSE, "--project-name", project_name, "down", ] kwargs = {} if not show_output: # TRACKING py3 - once we have full Py3 support in the test environment # we can make use of `subprocess.DEVNULL` devnull = open(os.devnull, "wb") kwargs["stderr"] = devnull kwargs["stdout"] = devnull return subprocess.Popen(docker_compose_down_command, **kwargs) def normalize_testname(testname): """Normalize test name for use with `docker-compose`. `docker-compose` normalizes project names by removing whitespace and other punctuation. >>> normalize_testname('test-push-basic.t') 'testpushbasict' >>> normalize_testname('this is a test') 'thisisatest' >>> normalize_testname('lolol!!!!!! haha') 'lololhaha' >>> normalize_testname('hello/there/testname') testname >>> normalize_testname(None) None """ if not testname: return None # TODO names like "/" will break this testname = testname.split("/")[-1].lower() return "".join( char for char in testname if char not in (set(string.punctuation) | set(string.whitespace)) )