in testing/vcttesting/hgmo.py [0:0]
def start(self, master_ssh_port=None, show_output=False):
"""Start the cluster.
If ``coverage`` is True, code coverage for Python executions will be
obtained.
"""
if not self.testname:
raise Exception("cluster name is not set")
if self.get_cluster_containers(onetime=True):
raise Exception(
"pre-existing containers exist for project %s;\n"
"(try running `hgmo clean` or `docker container rm`)" % self.testname
)
# docker-compose needs the arguments in this order
docker_compose_up_command = [
"docker-compose",
# Use the `hgcluster-docker-compose` file
"--file",
HGCLUSTER_DOCKER_COMPOSE,
# Specify the project name for cluster management via container labels
"--project-name",
self.testname,
"up",
# Always recreate containers and volumes
"--force-recreate",
"--renew-anon-volumes",
# Use detached mode to run containers in the background
"-d",
]
newenv = os.environ.copy()
if master_ssh_port:
# Use a `:` here, so that leaving the field blank will cause it to be
# empty in the docker-compose file, allowing us to `docker-compose down`
# without knowing the master ssh port of the cluster.
newenv["MASTER_SSH_PORT"] = "%d:" % master_ssh_port
kwargs = {"env": newenv}
if not show_output:
kwargs["stderr"] = subprocess.DEVNULL
kwargs["stdout"] = subprocess.DEVNULL
compose_up_process = subprocess.Popen(docker_compose_up_command, **kwargs)
try:
cluster_containers = self.get_cluster_containers()
except Exception as e:
print(e, file=sys.stderr)
print("\n", file=sys.stderr)
return_code = compose_up_process.wait()
print(
"docker-compose errored with exit code %d: stderr:\n%s"
% (return_code, compose_up_process.stderr),
file=sys.stderr,
)
sys.exit(1)
all_states = {
name: state for name, state in cluster_containers.items() if "hg" in name
}
web_states = {
name: state for name, state in all_states.items() if "hgweb" in name
}
# Number of hg related containers, and number of web heads
n_web = len(web_states)
n_hg = len(all_states)
# Fail early here so we aren't surprised if `network_name` is an unexpected value
if not all(
len(state.attrs["NetworkSettings"]["Networks"]) == 1
for state in all_states.values()
):
raise Exception("Each container should only have one network attached")
network_name = list(
cluster_containers["hgssh"].attrs["NetworkSettings"]["Networks"].keys()
)[0]
master_id = cluster_containers["hgssh"].id
web_ids = [state.id for state in web_states.values()]
# Obtain replication and host SSH keys.
(
mirror_private_key,
mirror_public_key,
master_host_ed25519_key,
master_host_rsa_key,
) = self.get_mirror_ssh_keys(master_id)
with futures.ThreadPoolExecutor(n_web) as e:
# Set SSH keys on hgweb instances.
cmd = [
"/set-mirror-key.py",
mirror_private_key,
mirror_public_key,
"hgssh",
# FUTURE this will need updated once hgweb supports ed25519 keys
master_host_rsa_key,
]
for i in web_ids:
e.submit(self._d.execute, i, cmd)
# The host SSH keys are populated during container start as part of
# entrypoint.py. There is a race between the keys being generated and
# us fetching them. We wait on a network service (Kafka) started in
# entrypoint.py after SSH keys are generated to eliminate this race
# condition.
futures_list = []
with futures.ThreadPoolExecutor(n_web) as e:
for name, state in web_states.items():
# Wait until we can access Kafka from the host machine
host, port = self._d._get_host_hostname_port(state.attrs, "9092/tcp")
futures_list.append(
e.submit(wait_for_kafka, "%s:%s" % (host, port), 180)
)
for f in futures_list:
f.result()
f_mirror_host_keys = []
with futures.ThreadPoolExecutor(n_web) as e:
# Obtain host keys from mirrors.
for state in web_states.values():
f_mirror_host_keys.append(
(
state.attrs["NetworkSettings"]["Networks"][network_name][
"IPAddress"
],
e.submit(
self._d.get_file_content,
state.id,
"/etc/ssh/ssh_host_rsa_key.pub",
),
)
)
# Tell the master about all the mirrors.
args = ["/set-mirrors.py"]
for ip, f_key in f_mirror_host_keys:
key = f_key.result().decode("utf-8").strip()
key = " ".join(key.split()[0:2])
args.extend([ip, key])
self._d.execute(master_id, args)
master_ssh_hostname, master_ssh_hostport = self._d._get_host_hostname_port(
cluster_containers["hgssh"].attrs, "22/tcp"
)
pulse_hostname, pulse_hostport = self._d._get_host_hostname_port(
cluster_containers["pulse"].attrs, "5672/tcp"
)
futures_list = []
# 4 threads, one for each service we need to wait on
with futures.ThreadPoolExecutor(4) as e:
futures_list.append(
e.submit(self.ldap.create_vcs_sync_login, mirror_public_key)
)
futures_list.append(
e.submit(
wait_for_amqp, pulse_hostname, pulse_hostport, "guest", "guest"
)
)
futures_list.append(
e.submit(wait_for_ssh, master_ssh_hostname, master_ssh_hostport)
)
# We already waited on the web nodes above. So only need to
# wait on master here.
h, p = self._d._get_host_hostname_port(
cluster_containers["hgssh"].attrs, "9092/tcp"
)
futures_list.append(e.submit(wait_for_kafka, "%s:%s" % (h, p), 20))
# Will re-raise exceptions.
for f in futures_list:
f.result()
# Create Kafka topics.
TOPICS = [
("pushdata", "8"),
("replicatedpushdatapending", "1"),
("replicatedpushdata", "1"),
]
futures_list = []
with futures.ThreadPoolExecutor(len(TOPICS)) as e:
for topic, partitions in TOPICS:
cmd = [
"/opt/kafka/bin/kafka-topics.sh",
"--create",
"--topic",
topic,
"--partitions",
partitions,
"--replication-factor",
str(n_hg),
"--config",
"min.insync.replicas=2",
"--config",
"unclean.leader.election.enable=false",
"--config",
"max.message.bytes=104857600",
"--zookeeper",
"hgssh:2181/hgmoreplication,hgweb0:2181/hgmoreplication,hgweb1:2181/hgmoreplication",
]
futures_list.append(
e.submit(self._d.execute, master_id, cmd, stdout=True)
)
for f in futures.as_completed(futures_list):
result = f.result()
if "Created topic" not in result:
raise Exception("kafka topic not created: %s" % result)
# There appears to be a race condition between the topic being
# created and the topic being available. So we explicitly wait
# for the topic to appear on all clients so processes within
# containers don't need to wait.
with futures.ThreadPoolExecutor(4) as e:
futures_list = []
for state in all_states.values():
h, p = self._d._get_host_hostname_port(state.attrs, "9092/tcp")
hostport = "%s:%s" % (h, p)
for topic in TOPICS:
futures_list.append(
e.submit(wait_for_kafka_topic, hostport, topic[0])
)
[f.result() for f in futures_list]
return self.get_state()