def start()

in testing/vcttesting/hgmo.py [0:0]


    def start(self, master_ssh_port=None, show_output=False):
        """Start the cluster.

        If ``coverage`` is True, code coverage for Python executions will be
        obtained.
        """
        if not self.testname:
            raise Exception("cluster name is not set")

        if self.get_cluster_containers(onetime=True):
            raise Exception(
                "pre-existing containers exist for project %s;\n"
                "(try running `hgmo clean` or `docker container rm`)" % self.testname
            )

        # docker-compose needs the arguments in this order
        docker_compose_up_command = [
            "docker-compose",
            # Use the `hgcluster-docker-compose` file
            "--file",
            HGCLUSTER_DOCKER_COMPOSE,
            # Specify the project name for cluster management via container labels
            "--project-name",
            self.testname,
            "up",
            # Always recreate containers and volumes
            "--force-recreate",
            "--renew-anon-volumes",
            # Use detached mode to run containers in the background
            "-d",
        ]

        newenv = os.environ.copy()

        if master_ssh_port:
            # Use a `:` here, so that leaving the field blank will cause it to be
            # empty in the docker-compose file, allowing us to `docker-compose down`
            # without knowing the master ssh port of the cluster.
            newenv["MASTER_SSH_PORT"] = "%d:" % master_ssh_port

        kwargs = {"env": newenv}
        if not show_output:
            kwargs["stderr"] = subprocess.DEVNULL
            kwargs["stdout"] = subprocess.DEVNULL

        compose_up_process = subprocess.Popen(docker_compose_up_command, **kwargs)

        try:
            cluster_containers = self.get_cluster_containers()
        except Exception as e:
            print(e, file=sys.stderr)
            print("\n", file=sys.stderr)

            return_code = compose_up_process.wait()

            print(
                "docker-compose errored with exit code %d: stderr:\n%s"
                % (return_code, compose_up_process.stderr),
                file=sys.stderr,
            )
            sys.exit(1)

        all_states = {
            name: state for name, state in cluster_containers.items() if "hg" in name
        }
        web_states = {
            name: state for name, state in all_states.items() if "hgweb" in name
        }

        # Number of hg related containers, and number of web heads
        n_web = len(web_states)
        n_hg = len(all_states)

        # Fail early here so we aren't surprised if `network_name` is an unexpected value
        if not all(
            len(state.attrs["NetworkSettings"]["Networks"]) == 1
            for state in all_states.values()
        ):
            raise Exception("Each container should only have one network attached")

        network_name = list(
            cluster_containers["hgssh"].attrs["NetworkSettings"]["Networks"].keys()
        )[0]

        master_id = cluster_containers["hgssh"].id
        web_ids = [state.id for state in web_states.values()]

        # Obtain replication and host SSH keys.
        (
            mirror_private_key,
            mirror_public_key,
            master_host_ed25519_key,
            master_host_rsa_key,
        ) = self.get_mirror_ssh_keys(master_id)

        with futures.ThreadPoolExecutor(n_web) as e:
            # Set SSH keys on hgweb instances.
            cmd = [
                "/set-mirror-key.py",
                mirror_private_key,
                mirror_public_key,
                "hgssh",
                # FUTURE this will need updated once hgweb supports ed25519 keys
                master_host_rsa_key,
            ]
            for i in web_ids:
                e.submit(self._d.execute, i, cmd)

        # The host SSH keys are populated during container start as part of
        # entrypoint.py. There is a race between the keys being generated and
        # us fetching them. We wait on a network service (Kafka) started in
        # entrypoint.py after SSH keys are generated to eliminate this race
        # condition.
        futures_list = []
        with futures.ThreadPoolExecutor(n_web) as e:
            for name, state in web_states.items():
                # Wait until we can access Kafka from the host machine
                host, port = self._d._get_host_hostname_port(state.attrs, "9092/tcp")
                futures_list.append(
                    e.submit(wait_for_kafka, "%s:%s" % (host, port), 180)
                )

        for f in futures_list:
            f.result()

        f_mirror_host_keys = []
        with futures.ThreadPoolExecutor(n_web) as e:
            # Obtain host keys from mirrors.
            for state in web_states.values():
                f_mirror_host_keys.append(
                    (
                        state.attrs["NetworkSettings"]["Networks"][network_name][
                            "IPAddress"
                        ],
                        e.submit(
                            self._d.get_file_content,
                            state.id,
                            "/etc/ssh/ssh_host_rsa_key.pub",
                        ),
                    )
                )

        # Tell the master about all the mirrors.
        args = ["/set-mirrors.py"]
        for ip, f_key in f_mirror_host_keys:
            key = f_key.result().decode("utf-8").strip()
            key = " ".join(key.split()[0:2])
            args.extend([ip, key])
        self._d.execute(master_id, args)

        master_ssh_hostname, master_ssh_hostport = self._d._get_host_hostname_port(
            cluster_containers["hgssh"].attrs, "22/tcp"
        )
        pulse_hostname, pulse_hostport = self._d._get_host_hostname_port(
            cluster_containers["pulse"].attrs, "5672/tcp"
        )

        futures_list = []
        # 4 threads, one for each service we need to wait on
        with futures.ThreadPoolExecutor(4) as e:
            futures_list.append(
                e.submit(self.ldap.create_vcs_sync_login, mirror_public_key)
            )
            futures_list.append(
                e.submit(
                    wait_for_amqp, pulse_hostname, pulse_hostport, "guest", "guest"
                )
            )
            futures_list.append(
                e.submit(wait_for_ssh, master_ssh_hostname, master_ssh_hostport)
            )
            # We already waited on the web nodes above. So only need to
            # wait on master here.
            h, p = self._d._get_host_hostname_port(
                cluster_containers["hgssh"].attrs, "9092/tcp"
            )
            futures_list.append(e.submit(wait_for_kafka, "%s:%s" % (h, p), 20))

        # Will re-raise exceptions.
        for f in futures_list:
            f.result()

        # Create Kafka topics.
        TOPICS = [
            ("pushdata", "8"),
            ("replicatedpushdatapending", "1"),
            ("replicatedpushdata", "1"),
        ]
        futures_list = []
        with futures.ThreadPoolExecutor(len(TOPICS)) as e:
            for topic, partitions in TOPICS:
                cmd = [
                    "/opt/kafka/bin/kafka-topics.sh",
                    "--create",
                    "--topic",
                    topic,
                    "--partitions",
                    partitions,
                    "--replication-factor",
                    str(n_hg),
                    "--config",
                    "min.insync.replicas=2",
                    "--config",
                    "unclean.leader.election.enable=false",
                    "--config",
                    "max.message.bytes=104857600",
                    "--zookeeper",
                    "hgssh:2181/hgmoreplication,hgweb0:2181/hgmoreplication,hgweb1:2181/hgmoreplication",
                ]
                futures_list.append(
                    e.submit(self._d.execute, master_id, cmd, stdout=True)
                )

        for f in futures.as_completed(futures_list):
            result = f.result()
            if "Created topic" not in result:
                raise Exception("kafka topic not created: %s" % result)

        # There appears to be a race condition between the topic being
        # created and the topic being available. So we explicitly wait
        # for the topic to appear on all clients so processes within
        # containers don't need to wait.
        with futures.ThreadPoolExecutor(4) as e:
            futures_list = []
            for state in all_states.values():
                h, p = self._d._get_host_hostname_port(state.attrs, "9092/tcp")
                hostport = "%s:%s" % (h, p)
                for topic in TOPICS:
                    futures_list.append(
                        e.submit(wait_for_kafka_topic, hostport, topic[0])
                    )

            [f.result() for f in futures_list]

        return self.get_state()