tools/minicluster/minicluster.py (645 lines of code) (raw):

#!/usr/bin/env python from collections import OrderedDict import os import requests import random import string import sys import time import client from docker_client import Client import print_utils import kind import utils RESOURCE_MANAGER = 1 HOST_MANAGER = 2 PLACEMENT_ENGINE = 3 JOB_MANAGER = 4 ARCHIVER = 5 AURORABRIDGE = 6 APISERVER = 7 MOCK_CQOS = 8 work_dir = os.path.dirname(os.path.abspath(__file__)) class Minicluster(object): def __init__(self, config, disable_mesos=False, enable_k8s=False, enable_peloton=False, use_host_pool=False, disabled_applications={}, zk_url=None): self.config = config self.disable_mesos = disable_mesos self.enable_k8s = enable_k8s self.enable_peloton = enable_peloton self.disabled_applications = disabled_applications self.zk_url = zk_url self.config['use_host_pool'] = use_host_pool self.k8s = kind.Kind(config["k8s_cluster_name"]) self._peloton_client = None self._namespace = "" # Used for isolating miniclusters from each other self.cli = Client(base_url="unix://var/run/docker.sock", namespace=self._namespace) self._create_peloton_ports() self._create_mesos_ports() global default_cluster default_cluster = self # Defines the order in which the apps are started # NB: HOST_MANAGER is tied to database migrations so should # be started first # TODO: Start all apps at the same time. self.APP_START_ORDER = OrderedDict( [ (MOCK_CQOS, self.run_peloton_mockcqos), (HOST_MANAGER, self.run_peloton_hostmgr), (RESOURCE_MANAGER, self.run_peloton_resmgr), (PLACEMENT_ENGINE, self.run_peloton_placement), (JOB_MANAGER, self.run_peloton_jobmgr), (ARCHIVER, self.run_peloton_archiver), (AURORABRIDGE, self.run_peloton_aurorabridge), (APISERVER, self.run_peloton_apiserver), ] ) def _create_mesos_ports(self): self.mesos_agent_ports = [] base = self.config["local_agent_port"] for i in range(0, self.config['num_agents']): self.mesos_agent_ports.append(base + i) for i in range(0, self.config.get('num_exclusive_agents', 0)): self.mesos_agent_ports.append(base + self.config['num_agents'] + i) def _create_peloton_ports(self): config = self.config self.resmgr_ports = [] for i in range(0, config["peloton_resmgr_instance_count"]): # to not cause port conflicts among apps, increase port by 10 # for each instance portset = [ port + i * 10 for port in config["peloton_resmgr_ports"]] self.resmgr_ports.append(portset) self.hostmgr_ports = [] for i in range(0, config["peloton_hostmgr_instance_count"]): portset = [ port + i * 10 for port in config["peloton_hostmgr_ports"]] self.hostmgr_ports.append(portset) self.jobmgr_ports = [] for i in range(0, config["peloton_jobmgr_instance_count"]): portset = [ port + i * 10 for port in config["peloton_jobmgr_ports"]] self.jobmgr_ports.append(portset) self.aurorabridge_ports = [] for i in range(0, config["peloton_aurorabridge_instance_count"]): portset = [ port + i * 10 for port in config["peloton_aurorabridge_ports"]] self.aurorabridge_ports.append(portset) self.apiserver_ports = [] for i in range(0, config["peloton_apiserver_instance_count"]): portset = [ port + i * 10 for port in config["peloton_apiserver_ports"]] self.apiserver_ports.append(portset) self.archiver_ports = [] for i in range(0, config["peloton_archiver_instance_count"]): portset = [ port + i * 10 for port in config["peloton_archiver_ports"]] self.archiver_ports.append(portset) self.placement_ports = [] for i in range(0, len(config["peloton_placement_instances"])): portset = [ port + i * 10 for port in config["peloton_placement_ports"]] self.placement_ports.append(portset) self.mockcqos_ports = [] for i in range(0, config["peloton_mock_cqos_instance_count"]): portset = [ port + i * 10 for port in config["peloton_mock_cqos_ports"]] self.mockcqos_ports.append(portset) # Isolate changes the port numbers, container names, and other # config values that need to be unique on a single host. def isolate(self): # Generate a random string, which will be the "namespace" of the # minicluster. This namespace will be used to add random suffixes to # container names so that they do not collide on the same docker # daemon. # TODO: Use this namespace. letters = string.ascii_lowercase rand_str = ''.join(random.choice(letters) for i in range(3)) self._namespace = rand_str + '-' self.cli = Client(base_url="unix://var/run/docker.sock", namespace=self._namespace) # Now we need to randomize the ports. # TODO: Fix race condition between the find_free_port() and the process # that will actually bind to that port. self.config["local_master_port"] = utils.find_free_port() self.config["local_zk_port"] = utils.find_free_port() self.config["local_cassandra_cql_port"] = utils.find_free_port() self.config["local_cassandra_thrift_port"] = utils.find_free_port() self.mesos_agent_ports = utils.randomize_ports(self.mesos_agent_ports) self.resmgr_ports = utils.randomize_ports(self.resmgr_ports) self.hostmgr_ports = utils.randomize_ports(self.hostmgr_ports) self.jobmgr_ports = utils.randomize_ports(self.jobmgr_ports) self.aurorabridge_ports = utils.randomize_ports( self.aurorabridge_ports) self.apiserver_ports = utils.randomize_ports(self.apiserver_ports) self.archiver_ports = utils.randomize_ports(self.archiver_ports) self.placement_ports = utils.randomize_ports(self.placement_ports) self.mockcqos_ports = utils.randomize_ports(self.mockcqos_ports) # TODO: Save those to local disk, or print them to stdout. return self._namespace def setup(self): if self.enable_k8s: self._setup_k8s() if not self.disable_mesos: self._setup_mesos() self._setup_cassandra() if self.enable_peloton: self._setup_peloton() def teardown(self, stop=False): print_utils.okgreen("teardown started...") self._teardown_peloton(stop) self._teardown_mesos() self._teardown_k8s() self._teardown_cassandra() self._teardown_zk() print_utils.okgreen("teardown complete!") def set_mesos_agent_exclusive(self, index, exclusive_label_value): self.teardown_mesos_agent(index, is_exclusive=False) port = self.mesos_agent_ports[index] self.setup_mesos_agent( index, port, is_exclusive=True, exclusive_label_value=exclusive_label_value) def set_mesos_agent_nonexclusive(self, index): self.teardown_mesos_agent(index, is_exclusive=True) port = self.mesos_agent_ports[index] self.setup_mesos_agent(index, port) def setup_mesos_agent(self, index, local_port, is_exclusive=False, exclusive_label_value=''): config = self.config prefix = config["mesos_agent_container"] attributes = config["attributes"] if is_exclusive: prefix += "-exclusive" attributes += ";peloton/exclusive:" + exclusive_label_value agent = prefix + repr(index) ctn_port = config["agent_port"] container = self.cli.create_container( name=agent, hostname=agent, volumes=["/files", "/var/run/docker.sock"], ports=[repr(ctn_port)], host_config=self.cli.create_host_config( port_bindings={ctn_port: local_port}, binds=[ work_dir + "/files:/files", work_dir + "/mesos_config/etc_mesos-slave:/etc/mesos-slave", "/var/run/docker.sock:/var/run/docker.sock", ], privileged=True, ), environment=[ "MESOS_PORT=" + repr(ctn_port), "MESOS_MASTER=zk://{0}:{1}/mesos".format( self.cli.get_container_ip(config["zk_container"]), config["default_zk_port"], ), "MESOS_SWITCH_USER=" + repr(config["switch_user"]), "MESOS_CONTAINERIZERS=" + config["containers"], "MESOS_LOG_DIR=" + config["log_dir"], "MESOS_ISOLATION=" + config["isolation"], "MESOS_SYSTEMD_ENABLE_SUPPORT=false", "MESOS_IMAGE_PROVIDERS=" + config["image_providers"], "MESOS_IMAGE_PROVISIONER_BACKEND={0}".format( config["image_provisioner_backend"] ), "MESOS_APPC_STORE_DIR=" + config["appc_store_dir"], "MESOS_WORK_DIR=" + config["work_dir"], "MESOS_RESOURCES=" + config["resources"], "MESOS_ATTRIBUTES=" + attributes, "MESOS_MODULES=" + config["modules"], "MESOS_RESOURCE_ESTIMATOR=" + config["resource_estimator"], "MESOS_OVERSUBSCRIBED_RESOURCES_INTERVAL=" + config["oversubscribed_resources_interval"], "MESOS_QOS_CONTROLLER=" + config["qos_controller"], "MESOS_QOS_CORRECTION_INTERVAL_MIN=" + config["qos_correction_interval_min"], ], image=config["mesos_slave_image"], entrypoint="bash /files/run_mesos_slave.sh", detach=True, ) self.cli.start(container=container.get("Id")) utils.wait_for_up(agent, local_port, "state.json") def teardown_mesos_agent(self, index, is_exclusive=False): prefix = self.config["mesos_agent_container"] if is_exclusive: prefix += "-exclusive" agent = prefix + repr(index) self.cli.remove_existing_container(agent) def _setup_k8s(self): print_utils.okgreen("starting k8s cluster") self.k8s.teardown() self.k8s.create() print_utils.okgreen("started k8s cluster") def _teardown_k8s(self): self.k8s.teardown() def _setup_mesos(self): self._teardown_mesos() self._teardown_zk() self._setup_zk() self._setup_mesos_master() self._setup_mesos_agents() def _teardown_mesos(self): # 1 - Remove all Mesos Agents for i in range(0, self.config["num_agents"]): self.teardown_mesos_agent(i) for i in range(0, self.config.get("num_exclusive_agents", 0)): self.teardown_mesos_agent(i, is_exclusive=True) # 2 - Remove Mesos Master self.cli.remove_existing_container( self.config["mesos_master_container"]) # 3- Remove orphaned mesos containers. for c in self.cli.containers(filters={"name": "^/mesos-"}, all=True): self.cli.remove_existing_container(c.get("Id")) def _setup_zk(self): config = self.config self.cli.pull(config["zk_image"]) container = self.cli.create_container( name=config["zk_container"], hostname=config["zk_container"], host_config=self.cli.create_host_config( port_bindings={ config["default_zk_port"]: config["local_zk_port"], }, ), image=config["zk_image"], detach=True, ) self.cli.start(container=container.get("Id")) print_utils.okgreen("started container %s" % config["zk_container"]) print_utils.okgreen("waiting on %s to be rdy" % config["zk_container"]) count = 0 while count < utils.max_retry_attempts: count += 1 if utils.is_zk_ready(config["local_zk_port"]): return time.sleep(utils.sleep_time_secs) raise Exception("zk failed to come up in time") def _teardown_zk(self): self.cli.remove_existing_container(self.config["zk_container"]) def _setup_cassandra(self): config = self.config self.cli.remove_existing_container(config["cassandra_container"]) self.cli.pull(config["cassandra_image"]) container = self.cli.create_container( name=config["cassandra_container"], hostname=config["cassandra_container"], host_config=self.cli.create_host_config( port_bindings={ config["cassandra_cql_port"]: config[ "local_cassandra_cql_port" ], config["cassandra_thrift_port"]: config[ "local_cassandra_thrift_port" ], }, binds=[work_dir + "/files:/files"], ), environment=["MAX_HEAP_SIZE=1G", "HEAP_NEWSIZE=256M"], image=config["cassandra_image"], detach=True, entrypoint="bash /files/run_cassandra_with_stratio_index.sh", ) self.cli.start(container=container.get("Id")) print_utils.okgreen("started container %s" % config["cassandra_container"]) self._create_cassandra_store() def _teardown_cassandra(self): self.cli.remove_existing_container(self.config["cassandra_container"]) def _setup_mesos_master(self): config = self.config self.cli.pull(config["mesos_master_image"]) container = self.cli.create_container( name=config["mesos_master_container"], hostname=config["mesos_master_container"], volumes=["/files"], ports=[repr(config["master_port"])], host_config=self.cli.create_host_config( port_bindings={config["master_port"]: config[ "local_master_port" ]}, binds=[ work_dir + "/files:/files", work_dir + "/mesos_config/etc_mesos-master:/etc/mesos-master", ], privileged=True, ), environment=[ "MESOS_AUTHENTICATE_HTTP_READWRITE=true", "MESOS_AUTHENTICATE_FRAMEWORKS=true", # TODO: Enable following flags for fully authentication. "MESOS_AUTHENTICATE_HTTP_FRAMEWORKS=true", "MESOS_HTTP_FRAMEWORK_AUTHENTICATORS=basic", "MESOS_CREDENTIALS=/etc/mesos-master/credentials", "MESOS_LOG_DIR=" + config["log_dir"], "MESOS_PORT=" + repr(config["master_port"]), "MESOS_ZK=zk://{0}:{1}/mesos".format( self.cli.get_container_ip(config["zk_container"]), config["default_zk_port"], ), "MESOS_QUORUM=" + repr(config["quorum"]), "MESOS_REGISTRY=" + config["registry"], "MESOS_WORK_DIR=" + config["work_dir"], ], image=config["mesos_master_image"], entrypoint="bash /files/run_mesos_master.sh", detach=True, ) self.cli.start(container=container.get("Id")) master_container = config["mesos_master_container"] print_utils.okgreen("started container %s" % master_container) def _setup_mesos_agents(self): config = self.config self.cli.pull(config['mesos_slave_image']) for i in range(0, config['num_agents']): port = self.mesos_agent_ports[i] self.setup_mesos_agent(i, port) for i in range(0, config.get('num_exclusive_agents', 0)): port = self.mesos_agent_ports[config['num_agents'] + i] self.setup_mesos_agent( i, port, is_exclusive=True, exclusive_label_value=config.get('exclusive_label_value', '')) def _create_cassandra_store(self): config = self.config retry_attempts = 0 while retry_attempts < utils.max_retry_attempts: time.sleep(utils.sleep_time_secs) setup_exe = self.cli.exec_create( container=config["cassandra_container"], cmd="/files/setup_cassandra.sh", ) show_exe = self.cli.exec_create( container=config["cassandra_container"], cmd='cqlsh -e "describe %s"' % config["cassandra_test_db"], ) # by api design, exec_start needs to be called after exec_create # to run 'docker exec' resp = self.cli.exec_start(exec_id=setup_exe) if resp == "": resp = self.cli.exec_start(exec_id=show_exe) if "CREATE KEYSPACE peloton_test WITH" in resp: print_utils.okgreen("cassandra store is created") return if retry_attempts % 5 == 1: print_utils.warn("failed to create c* store, retrying...") retry_attempts += 1 print_utils.fail( "Failed to create cassandra store after %d attempts, " "aborting..." % utils.max_retry_attempts ) sys.exit(1) def _setup_peloton(self): print_utils.okblue( 'docker image "uber/peloton" has to be built first ' "locally by running IMAGE=uber/peloton make docker" ) for app, func in self.APP_START_ORDER.iteritems(): if app in self.disabled_applications: should_disable = self.disabled_applications[app] if should_disable: continue self.APP_START_ORDER[app]() def _teardown_peloton(self, stop): config = self.config if stop: # Stop existing container func = self.cli.stop_container else: # Remove existing container func = self.cli.remove_existing_container # 1 - Remove jobmgr instances for i in range(0, config["peloton_jobmgr_instance_count"]): name = config["peloton_jobmgr_container"] + repr(i) func(name) # 2 - Remove placement engine instances for i in range(0, len(config["peloton_placement_instances"])): name = config["peloton_placement_container"] + repr(i) func(name) # 3 - Remove resmgr instances for i in range(0, config["peloton_resmgr_instance_count"]): name = config["peloton_resmgr_container"] + repr(i) func(name) # 4 - Remove hostmgr instances for i in range(0, config["peloton_hostmgr_instance_count"]): name = config["peloton_hostmgr_container"] + repr(i) func(name) # 5 - Remove archiver instances for i in range(0, config["peloton_archiver_instance_count"]): name = config["peloton_archiver_container"] + repr(i) func(name) # 6 - Remove aurorabridge instances for i in range(0, config["peloton_aurorabridge_instance_count"]): name = config["peloton_aurorabridge_container"] + repr(i) func(name) # 7 - Remove apiserver instances for i in range(0, config["peloton_apiserver_instance_count"]): name = config["peloton_apiserver_container"] + repr(i) func(name) # 8 - Remove mock-cqos instances for i in range(0, config["peloton_mock_cqos_instance_count"]): name = config["peloton_mock_cqos_container"] + repr(i) func(name) # Run peloton resmgr app def run_peloton_resmgr(self): env = {} if self.enable_k8s: env.update({"HOSTMGR_API_VERSION": "v1alpha"}) # TODO: move docker run logic into a common function for all # apps to share config = self.config for i in range(0, config["peloton_resmgr_instance_count"]): ports = self.resmgr_ports[i] name = config["peloton_resmgr_container"] + repr(i) self.cli.remove_existing_container(name) self.start_and_wait( "resmgr", name, ports, extra_env=env, ) # Run peloton hostmgr app def run_peloton_hostmgr(self): config = self.config scarce_resource = ",".join(config["scarce_resource_types"]) slack_resource = ",".join(config["slack_resource_types"]) mounts = [] env = { "SCARCE_RESOURCE_TYPES": scarce_resource, "SLACK_RESOURCE_TYPES": slack_resource, "ENABLE_HOST_POOL": True, } if self.enable_k8s: kubeconfig_dir = os.path.dirname(self.k8s.get_kubeconfig()) mounts = [kubeconfig_dir + ":/.kube"] # Always enable host pool in hostmgr of mini cluster. env.update({ "ENABLE_K8S": True, "KUBECONFIG": "/.kube/kind-config-peloton-k8s", }) for i in range(0, config["peloton_hostmgr_instance_count"]): ports = self.hostmgr_ports[i] name = config["peloton_hostmgr_container"] + repr(i) self.cli.remove_existing_container(name) self.start_and_wait( "hostmgr", name, ports, extra_env=env, mounts=mounts, ) # Run peloton jobmgr app def run_peloton_jobmgr(self): config = self.config env = { "MESOS_AGENT_WORK_DIR": config["work_dir"], "JOB_TYPE": os.getenv("JOB_TYPE", "BATCH"), } if self.enable_k8s: env.update({"HOSTMGR_API_VERSION": "v1alpha"}) for i in range(0, config["peloton_jobmgr_instance_count"]): ports = self.jobmgr_ports[i] name = config["peloton_jobmgr_container"] + repr(i) self.cli.remove_existing_container(name) self.start_and_wait( "jobmgr", name, ports, extra_env=env, ) # Run peloton aurora bridge app def run_peloton_aurorabridge(self): config = self.config for i in range(0, config["peloton_aurorabridge_instance_count"]): ports = self.aurorabridge_ports[i] name = config["peloton_aurorabridge_container"] + repr(i) self.cli.remove_existing_container(name) self.start_and_wait("aurorabridge", name, ports) # Run peloton placement app def run_peloton_placement(self): i = 0 config = self.config for task_type in config["peloton_placement_instances"]: ports = self.placement_ports[i] name = config["peloton_placement_container"] + repr(i) self.cli.remove_existing_container(name) if task_type == 'BATCH': app_type = 'placement' else: app_type = 'placement_' + task_type.lower() env = { "APP_TYPE": app_type, "TASK_TYPE": task_type, "USE_HOST_POOL": config.get("use_host_pool", False), } if self.enable_k8s: env.update({"HOSTMGR_API_VERSION": "v1alpha"}) self.start_and_wait( "placement", name, ports, extra_env=env, ) i = i + 1 # Run peloton api server def run_peloton_apiserver(self): config = self.config for i in range(0, config["peloton_apiserver_instance_count"]): ports = self.apiserver_ports[i] name = config["peloton_apiserver_container"] + repr(i) self.cli.remove_existing_container(name) self.start_and_wait("apiserver", name, ports) # Run peloton mock-cqos server def run_peloton_mockcqos(self): config = self.config for i in range(0, config["peloton_mock_cqos_instance_count"]): ports = self.mockcqos_ports[i] name = config["peloton_mock_cqos_container"] + repr(i) self.cli.remove_existing_container(name) self.start_and_wait("mock-cqos", name, ports) # Run peloton archiver app def run_peloton_archiver(self): config = self.config for i in range(0, config["peloton_archiver_instance_count"]): ports = self.archiver_ports[i] name = config["peloton_archiver_container"] + repr(i) self.cli.remove_existing_container(name) self.start_and_wait( "archiver", name, ports, ) # Starts a container and waits for it to come up def start_and_wait(self, application_name, container_name, ports, extra_env=None, mounts=None): if mounts is None: mounts = [] # TODO: It's very implicit that the first port is the HTTP # port, perhaps we should split it out even more. election_zk_servers = None mesos_zk_path = None zk_url = self.zk_url config = self.config if zk_url is not None: election_zk_servers = zk_url mesos_zk_path = "zk://{0}/mesos".format(zk_url) else: election_zk_servers = "{0}:{1}".format( self.cli.get_container_ip(config["zk_container"]), config["default_zk_port"], ) mesos_zk_path = "zk://{0}:{1}/mesos".format( self.cli.get_container_ip(config["zk_container"]), config["default_zk_port"], ) cass_hosts = self.cli.get_container_ip(config["cassandra_container"]) env = { "CONFIG_DIR": "config", "APP": application_name, "HTTP_PORT": ports[0], "DB_HOST": cass_hosts, "ELECTION_ZK_SERVERS": election_zk_servers, "MESOS_ZK_PATH": mesos_zk_path, "MESOS_SECRET_FILE": "/files/hostmgr_mesos_secret", "CASSANDRA_HOSTS": cass_hosts, "ENABLE_DEBUG_LOGGING": config["debug"], "DATACENTER": "", # used to migrate the schema;used inside host manager "AUTO_MIGRATE": config["auto_migrate"], "CLUSTER": "minicluster", 'AUTH_TYPE': os.getenv('AUTH_TYPE', 'NOOP'), 'AUTH_CONFIG_FILE': os.getenv('AUTH_CONFIG_FILE'), } if len(ports) > 1: env["GRPC_PORT"] = ports[1] if extra_env: env.update(extra_env) environment = [] for key, value in env.iteritems(): environment.append("%s=%s" % (key, value)) # BIND_MOUNTS allows additional files to be mounted in the # the container. Expected format is a comma-separated list # of items of the form <host-path>:<container-path> extra_mounts = os.environ.get("BIND_MOUNTS", "").split(",") or [] mounts.extend(list(filter(None, extra_mounts))) container = self.cli.create_container( name=container_name, hostname=container_name, ports=[repr(port) for port in ports], environment=environment, host_config=self.cli.create_host_config( port_bindings={port: port for port in ports}, binds=[work_dir + "/files:/files"] + mounts, ), # pull or build peloton image if not exists image=config["peloton_image"], detach=True, ) self.cli.start(container=container.get("Id")) utils.wait_for_up( container_name, ports[0] ) # use the first port as primary def peloton_client(self): if self._peloton_client is not None: return self._peloton_client name = self.config.get("name", "standard-minicluster") zk_servers = "localhost:{}".format(self.config["local_zk_port"]) use_apiserver = os.getenv("USE_APISERVER") == 'True' grpc = "grpc://localhost:{}" self._peloton_client = client.PelotonClientWrapper( name=name, zk_servers=zk_servers, enable_apiserver=use_apiserver, api_url=grpc.format(self.apiserver_ports[0][1]), jm_url=grpc.format(self.jobmgr_ports[0][1]), rm_url=grpc.format(self.resmgr_ports[0][1]), hm_url=grpc.format(self.hostmgr_ports[0][1]), ) return self._peloton_client def wait_for_mesos_master_leader(self, timeout_secs=20): """ util method to wait for mesos master leader elected """ port = self.config.get("local_master_port") url = "{}:{}/state.json".format(utils.HTTP_LOCALHOST, port) print_utils.warn("waiting for mesos master leader") deadline = time.time() + timeout_secs while time.time() < deadline: try: resp = requests.get(url) if resp.status_code != 200: time.sleep(1) continue print_utils.okgreen("mesos master is ready") return except Exception: pass assert False, "timed out waiting for mesos master leader" def wait_for_all_agents_to_register(self, agent_count=3, timeout_secs=300): """ util method to wait for all agents to register """ port = self.config.get("local_master_port") url = "{}:{}/state.json".format(utils.HTTP_LOCALHOST, port) print_utils.warn("waiting for all mesos agents") deadline = time.time() + timeout_secs while time.time() < deadline: try: resp = requests.get(url) if resp.status_code == 200: registered_agents = 0 for a in resp.json()['slaves']: if a['active']: registered_agents += 1 if registered_agents == agent_count: print_utils.okgreen("all mesos agents are ready") return time.sleep(1) except Exception: pass default_cluster = Minicluster(utils.default_config())