testing/vcttesting/hgmo.py (336 lines of code) (raw):
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import os
import subprocess
import sys
import uuid
import concurrent.futures as futures
from .ldap import LDAP
from .util import (
docker_compose_down_background,
normalize_testname,
wait_for_amqp,
wait_for_kafka,
wait_for_kafka_topic,
wait_for_ssh,
)
import yaml
HERE = os.path.abspath(os.path.dirname(__file__))
ROOT = os.path.normpath(os.path.join(HERE, "..", ".."))
HGCLUSTER_DOCKER_COMPOSE = os.path.join(ROOT, "testing", "hgcluster-docker-compose.yml")
class HgCluster(object):
"""Interface to a cluster of HG servers.
This class manages Docker contains and environments that replicate the
hg.mozilla.org server configuration.
"""
MASTER_FILE_MAP = {
"hgext/pushlog/feed.py": "/var/hg/version-control-tools/hgext/pushlog/feed.py",
"hgext/pushlog/__init__.py": "/var/hg/version-control-tools/hgext/pushlog/__init__.py",
"hgext/serverlog/__init__.py": "/var/hg/version-control-tools/hgext/serverlog/__init__.py",
"hgserver/pash/pash.py": "/usr/local/bin/pash.py",
"hgserver/pash/hg_helper.py": "/usr/local/bin/hg_helper.py",
"hgserver/hgmolib/hgmolib/ldap_helper.py": "/usr/local/bin/ldap_helper.py",
"hgserver/pash/repo_group.py": "/usr/local/bin/repo_group.py",
"hgserver/pash/sh_helper.py": "/usr/local/bin/sh_helper.py",
}
def __init__(self, docker):
self._d = docker
self.testname = normalize_testname(os.getenv("TESTNAME"))
with open(HGCLUSTER_DOCKER_COMPOSE) as f:
self.docker_compose_content = yaml.safe_load(f)
if not self.testname:
print(
"$TESTNAME environment variable not set - commands will fail without "
"specifying a cluster name."
)
def get_cluster_containers(self, onetime=False):
"""Return containers corresponding to the cluster with the specified name."""
import time
initial = time.time()
while True:
project_containers = self._d.client.containers.list(
# Use sparse to avoid inspecting each container for information. Also
# avoids a race condition when attempting to inspect a container that no
# longer exists, resulting in a stack trace.
sparse=True,
filters={"label": "com.docker.compose.project=%s" % self.testname},
)
if onetime:
break
# Wait until ports are properly exposed
if len(project_containers) == len(self.docker_compose_content["services"]):
break
if time.time() - initial > 60:
raise Exception("timeout reached waiting for all 5 containers")
# Call `reload` to acquire data about our sparsely acquired objects
for container in project_containers:
container.reload()
return {
container.labels["com.docker.compose.service"]: container
for container in project_containers
}
def get_state(self):
"""Return a dict containing variables to be exported into the test environment shell."""
containers = self.get_cluster_containers()
params = {}
(
params["pulse_hostname"],
params["pulse_hostport"],
) = self._d._get_host_hostname_port(containers["pulse"].attrs, "5672/tcp")
(
params["master_ssh_hostname"],
params["master_ssh_port"],
) = self._d._get_host_hostname_port(containers["hgssh"].attrs, "22/tcp")
(
_,
_,
params["master_host_ed25519_key"],
params["master_host_rsa_key"],
) = self.get_mirror_ssh_keys(containers["hgssh"].id)
params["ldap_uri"] = "ldap://%s:%s" % self._d._get_host_hostname_port(
containers["ldap"].attrs, "389/tcp"
)
params["master_id"] = containers["hgssh"].id
params["hgweb_0_url"] = "http://%s:%s/" % self._d._get_host_hostname_port(
containers["hgweb0"].attrs, "80/tcp"
)
params["hgweb_1_url"] = "http://%s:%s/" % self._d._get_host_hostname_port(
containers["hgweb1"].attrs, "80/tcp"
)
params["hgweb_0_cid"] = containers["hgweb0"].id
params["hgweb_1_cid"] = containers["hgweb1"].id
params["kafka_0_hostport"] = "%s:%s" % self._d._get_host_hostname_port(
containers["hgssh"].attrs, "9092/tcp"
)
params["kafka_1_hostport"] = "%s:%s" % self._d._get_host_hostname_port(
containers["hgweb0"].attrs, "9092/tcp"
)
params["kafka_2_hostport"] = "%s:%s" % self._d._get_host_hostname_port(
containers["hgweb1"].attrs, "9092/tcp"
)
return params
@staticmethod
def build(image=None):
"""Build the hgcluster images."""
docker_compose_build_command = [
"docker-compose",
# Use the `hgcluster-docker-compose` file
"--file",
HGCLUSTER_DOCKER_COMPOSE,
"build",
"--parallel",
# Specify which images to avoid building hgweb twice
"hgweb0",
"hgssh",
"pulse",
"ldap",
]
if image:
docker_compose_build_command.append(image)
subprocess.run(
docker_compose_build_command,
check=True,
)
def start(self, master_ssh_port=None, show_output=False):
"""Start the cluster.
If ``coverage`` is True, code coverage for Python executions will be
obtained.
"""
if not self.testname:
raise Exception("cluster name is not set")
if self.get_cluster_containers(onetime=True):
raise Exception(
"pre-existing containers exist for project %s;\n"
"(try running `hgmo clean` or `docker container rm`)" % self.testname
)
# docker-compose needs the arguments in this order
docker_compose_up_command = [
"docker-compose",
# Use the `hgcluster-docker-compose` file
"--file",
HGCLUSTER_DOCKER_COMPOSE,
# Specify the project name for cluster management via container labels
"--project-name",
self.testname,
"up",
# Always recreate containers and volumes
"--force-recreate",
"--renew-anon-volumes",
# Use detached mode to run containers in the background
"-d",
]
newenv = os.environ.copy()
if master_ssh_port:
# Use a `:` here, so that leaving the field blank will cause it to be
# empty in the docker-compose file, allowing us to `docker-compose down`
# without knowing the master ssh port of the cluster.
newenv["MASTER_SSH_PORT"] = "%d:" % master_ssh_port
kwargs = {"env": newenv}
if not show_output:
kwargs["stderr"] = subprocess.DEVNULL
kwargs["stdout"] = subprocess.DEVNULL
compose_up_process = subprocess.Popen(docker_compose_up_command, **kwargs)
try:
cluster_containers = self.get_cluster_containers()
except Exception as e:
print(e, file=sys.stderr)
print("\n", file=sys.stderr)
return_code = compose_up_process.wait()
print(
"docker-compose errored with exit code %d: stderr:\n%s"
% (return_code, compose_up_process.stderr),
file=sys.stderr,
)
sys.exit(1)
all_states = {
name: state for name, state in cluster_containers.items() if "hg" in name
}
web_states = {
name: state for name, state in all_states.items() if "hgweb" in name
}
# Number of hg related containers, and number of web heads
n_web = len(web_states)
n_hg = len(all_states)
# Fail early here so we aren't surprised if `network_name` is an unexpected value
if not all(
len(state.attrs["NetworkSettings"]["Networks"]) == 1
for state in all_states.values()
):
raise Exception("Each container should only have one network attached")
network_name = list(
cluster_containers["hgssh"].attrs["NetworkSettings"]["Networks"].keys()
)[0]
master_id = cluster_containers["hgssh"].id
web_ids = [state.id for state in web_states.values()]
# Obtain replication and host SSH keys.
(
mirror_private_key,
mirror_public_key,
master_host_ed25519_key,
master_host_rsa_key,
) = self.get_mirror_ssh_keys(master_id)
with futures.ThreadPoolExecutor(n_web) as e:
# Set SSH keys on hgweb instances.
cmd = [
"/set-mirror-key.py",
mirror_private_key,
mirror_public_key,
"hgssh",
# FUTURE this will need updated once hgweb supports ed25519 keys
master_host_rsa_key,
]
for i in web_ids:
e.submit(self._d.execute, i, cmd)
# The host SSH keys are populated during container start as part of
# entrypoint.py. There is a race between the keys being generated and
# us fetching them. We wait on a network service (Kafka) started in
# entrypoint.py after SSH keys are generated to eliminate this race
# condition.
futures_list = []
with futures.ThreadPoolExecutor(n_web) as e:
for name, state in web_states.items():
# Wait until we can access Kafka from the host machine
host, port = self._d._get_host_hostname_port(state.attrs, "9092/tcp")
futures_list.append(
e.submit(wait_for_kafka, "%s:%s" % (host, port), 180)
)
for f in futures_list:
f.result()
f_mirror_host_keys = []
with futures.ThreadPoolExecutor(n_web) as e:
# Obtain host keys from mirrors.
for state in web_states.values():
f_mirror_host_keys.append(
(
state.attrs["NetworkSettings"]["Networks"][network_name][
"IPAddress"
],
e.submit(
self._d.get_file_content,
state.id,
"/etc/ssh/ssh_host_rsa_key.pub",
),
)
)
# Tell the master about all the mirrors.
args = ["/set-mirrors.py"]
for ip, f_key in f_mirror_host_keys:
key = f_key.result().decode("utf-8").strip()
key = " ".join(key.split()[0:2])
args.extend([ip, key])
self._d.execute(master_id, args)
master_ssh_hostname, master_ssh_hostport = self._d._get_host_hostname_port(
cluster_containers["hgssh"].attrs, "22/tcp"
)
pulse_hostname, pulse_hostport = self._d._get_host_hostname_port(
cluster_containers["pulse"].attrs, "5672/tcp"
)
futures_list = []
# 4 threads, one for each service we need to wait on
with futures.ThreadPoolExecutor(4) as e:
futures_list.append(
e.submit(self.ldap.create_vcs_sync_login, mirror_public_key)
)
futures_list.append(
e.submit(
wait_for_amqp, pulse_hostname, pulse_hostport, "guest", "guest"
)
)
futures_list.append(
e.submit(wait_for_ssh, master_ssh_hostname, master_ssh_hostport)
)
# We already waited on the web nodes above. So only need to
# wait on master here.
h, p = self._d._get_host_hostname_port(
cluster_containers["hgssh"].attrs, "9092/tcp"
)
futures_list.append(e.submit(wait_for_kafka, "%s:%s" % (h, p), 20))
# Will re-raise exceptions.
for f in futures_list:
f.result()
# Create Kafka topics.
TOPICS = [
("pushdata", "8"),
("replicatedpushdatapending", "1"),
("replicatedpushdata", "1"),
]
futures_list = []
with futures.ThreadPoolExecutor(len(TOPICS)) as e:
for topic, partitions in TOPICS:
cmd = [
"/opt/kafka/bin/kafka-topics.sh",
"--create",
"--topic",
topic,
"--partitions",
partitions,
"--replication-factor",
str(n_hg),
"--config",
"min.insync.replicas=2",
"--config",
"unclean.leader.election.enable=false",
"--config",
"max.message.bytes=104857600",
"--zookeeper",
"hgssh:2181/hgmoreplication,hgweb0:2181/hgmoreplication,hgweb1:2181/hgmoreplication",
]
futures_list.append(
e.submit(self._d.execute, master_id, cmd, stdout=True)
)
for f in futures.as_completed(futures_list):
result = f.result()
if "Created topic" not in result:
raise Exception("kafka topic not created: %s" % result)
# There appears to be a race condition between the topic being
# created and the topic being available. So we explicitly wait
# for the topic to appear on all clients so processes within
# containers don't need to wait.
with futures.ThreadPoolExecutor(4) as e:
futures_list = []
for state in all_states.values():
h, p = self._d._get_host_hostname_port(state.attrs, "9092/tcp")
hostport = "%s:%s" % (h, p)
for topic in TOPICS:
futures_list.append(
e.submit(wait_for_kafka_topic, hostport, topic[0])
)
[f.result() for f in futures_list]
return self.get_state()
def clean(self, cluster_name=None, show_output=False):
"""Clean the cluster.
Containers will be shut down and removed. The state file will
destroyed.
"""
cluster_name = normalize_testname(cluster_name or self.testname)
docker_compose_down_background(cluster_name, show_output=show_output)
@property
def ldap(self):
state = self.get_state()
return LDAP(state["ldap_uri"], "cn=admin,dc=mozilla", "password")
def get_mirror_ssh_keys(self, master_id=None):
with futures.ThreadPoolExecutor(4) as e:
f_private_key = e.submit(
self._d.get_file_content, master_id, "/etc/mercurial/mirror"
)
f_public_key = e.submit(
self._d.get_file_content, master_id, "/etc/mercurial/mirror.pub"
)
f_host_ed25519_key = e.submit(
self._d.get_file_content,
master_id,
"/etc/mercurial/ssh/ssh_host_ed25519_key.pub",
)
f_host_rsa_key = e.submit(
self._d.get_file_content,
master_id,
"/etc/mercurial/ssh/ssh_host_rsa_key.pub",
)
host_ed25519_key = " ".join(
f_host_ed25519_key.result().decode("utf-8").split()[0:2]
)
host_rsa_key = " ".join(f_host_rsa_key.result().decode("utf-8").split()[0:2])
return (
f_private_key.result().decode("utf-8"),
f_public_key.result().decode("utf-8"),
host_ed25519_key,
host_rsa_key,
)
def create_repo(self, name, group="scm_level_1"):
"""Create a repository on the cluster.
``path`` is the path fragment the repository would be accessed under
at https://hg.mozilla.org. e.g. ``hgcustom/version-control-tools``.
The repository will be owned by the specified ``group``.
"""
cmd = ["/create-repo", name, group]
state = self.get_state()
return self._d.execute(state["master_id"], cmd, stdout=True, stderr=True)
def aggregate_code_coverage(self, destdir):
master_map = {}
for host, container in self.MASTER_FILE_MAP.items():
master_map[container] = os.path.join(ROOT, host)
for c in self._d.get_code_coverage(self.master_id, filemap=master_map):
dest = os.path.join(destdir, "coverage.%s" % uuid.uuid1())
c.write_file(dest)