aios/tools/hape/hape_libs/commands/validate_cmd.py (113 lines of code) (raw):

# -*- coding: utf-8 -*- from platform import processor from .common import * import traceback import click import getpass import re from hape_libs.utils.logger import Logger from hape_libs.utils.shell import SSHShell from hape_libs.common import HapeCommon from hape_libs.appmaster.k8s.k8s_client import * from hape_libs.appmaster.docker.docker_util import * @click.command(short_help='Validate hape env and domain conf') @common_params def validate(**kwargs): try: havenask_domain = command_init(kwargs) except Exception as e: Logger.error(traceback.format_exc()) raise RuntimeError("Failed to init hape config, maybe has wrong domain config argument or failed to access zfs/hdfs/local path") processorMode = havenask_domain.global_config.common.processorMode if processorMode == "docker": validate_docker_mode(havenask_domain.domain_config) if processorMode == "k8s": validate_k8s_mode(havenask_domain.domain_config) # validate_port(domain_config) Logger.info("Succeed to validate basic hape enviroment") def validate_user(): user = getpass.getuser() not_supported_users = ['root'] if user in not_supported_users: raise RuntimeError("account:{} is not supported, please use anothor acount".format(user)) home = os.path.expanduser("~") if not home.startswith("/home"): raise RuntimeError("please make sure account:{} has home directory".format(user)) def validate_single(key, ip, domain_config): Logger.info("Begin to validate single container") global_config = domain_config.global_config ## check if image exists and container can be created image = global_config.get_appmaster_base_config(key, "image") container_name = "havenask-validate-single" shell = SSHShell(ip) out, succ = shell.execute_command("docker images | grep {}".format(".*".join(image.split(":"))), grep_text=image.split(":")[0]) if not succ: raise RuntimeError("Image {} not found in ip {}".format(image, ip)) homedir, user = global_config.default_variables.user_home, global_config.default_variables.user workdir = os.path.join(homedir, container_name) succ = DockerContainerUtil.create_container(ip=ip, name=container_name, workdir= workdir, homedir = homedir, user=user, cpu=100, mem=5120, image=image) if not succ: raise RuntimeError("Failed to create container") DockerContainerUtil.stop_container(ip=ip, name=container_name) Logger.info("Succeed to validate single container") ## check each admin can schedule workers def validate_schedule(key, admin_ip, worker_ip, domain_config): Logger.info("Begin to schedule from {} to {}".format(admin_ip, worker_ip)) container_name = "havenask-validate-schedule" shell = SSHShell(admin_ip) global_config = domain_config.global_config user_home = global_config.default_variables.user_home image = global_config.get_appmaster_base_config(key, "image") run_cmd = "docker run --workdir {} --volume=\"/etc/group:/etc/group:ro\" --volume=\"/etc/passwd:/etc/passwd:ro\" --volume=\"/etc/shadow:/etc/shadow:ro\" -v {}:{} --ulimit nofile=655350:655350 \ --ulimit memlock=-1 --ulimit core=-1 --network=host --privileged -d --cpu-quota=500000 \ --cpu-period=100000 --memory=5124m -v /etc/passwd:/home/.passwd -v /etc/group:/home/.group --name {} {} /sbin/init".format(user_home, user_home, user_home, container_name, image) cmd = "ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no {} '{}'".format(worker_ip, run_cmd) shell.execute_command(cmd) check_cmd = "docker ps --format {{{{.Names}}}} | grep ^{}$".format(container_name) cmd = "ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no {} '{}'".format(worker_ip, check_cmd) out = shell.execute_command(cmd) if out.find(container_name) == -1: raise RuntimeError("Failed to schedule container from {} to {}".format(admin_ip, worker_ip)) run_cmd = "docker rm -f {}".format(container_name) cmd = "ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no {} '{}'".format(worker_ip, run_cmd) shell.execute_command(cmd) Logger.info("Succeed to schedule from {} to {}".format(admin_ip, worker_ip)) def validate_docker_mode(domain_config): validate_user() global_config = domain_config.global_config ipset = set() for key in [HapeCommon.SWIFT_KEY, HapeCommon.HAVENASK_KEY, HapeCommon.BS_KEY]: admin_iplist = global_config.get_admin_candidate_ips(key) worker_iplist = [] for role, iplist in global_config.get_worker_candidate_maps(key).items(): worker_iplist.extend(iplist) for ip in admin_iplist + worker_iplist: if ip in ipset: continue ipset.add(ip) validate_single(key, ip, domain_config) ipset = {} for key in [HapeCommon.SWIFT_KEY, HapeCommon.HAVENASK_KEY, HapeCommon.BS_KEY]: admin_iplist = global_config.get_admin_candidate_ips(key) for admin_ip in admin_iplist: if admin_ip not in ipset: ipset[admin_ip] = set() for role, iplist in global_config.get_worker_candidate_maps(key).items(): for worker_ip in iplist: if worker_ip in ipset[admin_ip]: continue ipset[admin_ip].add(worker_ip) validate_schedule(key, admin_ip, worker_ip, domain_config) def validate_k8s_service_name(domain_config): pattern_str = r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?$' pattern = re.compile(pattern_str) for key in [HapeCommon.HAVENASK_KEY, HapeCommon.SWIFT_KEY, HapeCommon.BS_KEY]: name = domain_config.global_config.get_appmaster_service_name(key) if len(name) < 1 or len(name) > 253: raise RuntimeError("Length ({}) of service name [{}] is not allowed in k8s"%(len(name), name)) if not pattern.match(name): raise RuntimeError("Service name [{}] is not allowed in k8s, supported pattern: {}".format(name, pattern_str)) def validate_k8s_mode(domain_config): global_config = domain_config.global_config binarypath, kubeconfig = global_config.common.binaryPath, global_config.common.kubeConfigPath client = K8sClient(binarypath, kubeconfig) if not client.check_connections(): msg = "Failed to connect to k8s cluster in this machine, maybe you should check kube config in [{}]".format(kubeconfig) raise RuntimeError(msg) validate_k8s_service_name(domain_config)