lib/muchos/existing.py (352 lines of code) (raw):

#!/usr/bin/env python3 # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import shutil import subprocess import time from os import path from sys import exit from os import listdir class ExistingCluster: def __init__(self, config): self.config = config def launch(self): exit( "ERROR - 'launch' command cannot be used " "when cluster_type is set to 'existing'" ) def sync(self): config = self.config print( "Syncing ansible directory on {0} cluster proxy node".format( config.cluster_name ) ) host_vars = config.ansible_host_vars() play_vars = config.ansible_play_vars() for k, v in host_vars.items(): host_vars[k] = self.config.resolve_value(k, default=v) for k, v in play_vars.items(): play_vars[k] = self.config.resolve_value(k, default=v) with open( path.join(config.deploy_path, "ansible/site.yml"), "w" ) as site_file: print("- import_playbook: common.yml", file=site_file) print("- import_playbook: zookeeper.yml", file=site_file) print("- import_playbook: hadoop.yml", file=site_file) if config.has_service("spark"): print("- import_playbook: spark.yml", file=site_file) if config.has_service("metrics"): print("- import_playbook: metrics.yml", file=site_file) print("- import_playbook: accumulo.yml", file=site_file) if config.has_service("fluo"): print("- import_playbook: fluo.yml", file=site_file) if config.has_service("fluo_yarn"): print("- import_playbook: fluo_yarn.yml", file=site_file) if config.has_service("mesosmaster"): print("- import_playbook: mesos.yml", file=site_file) if config.has_service("swarmmanager"): print("- import_playbook: docker.yml", file=site_file) if config.has_service("elkserver"): print("- import_playbook: elk.yml", file=site_file) ansible_conf = path.join(config.deploy_path, "ansible/conf") with open(path.join(ansible_conf, "hosts"), "w") as hosts_file: print( "[proxy]\n{0}".format(config.proxy_hostname()), file=hosts_file ) print("\n[accumulomaster]", file=hosts_file) for accu_host in config.get_service_hostnames("accumulomaster"): print(accu_host, file=hosts_file) print("\n[namenode]", file=hosts_file) for nn_host in config.get_service_hostnames("namenode"): print(nn_host, file=hosts_file) print("\n[journalnode]", file=hosts_file) for jn_host in config.get_service_hostnames("journalnode"): print(jn_host, file=hosts_file) print("\n[zkfc]", file=hosts_file) for zkfc_host in config.get_service_hostnames("zkfc"): print(zkfc_host, file=hosts_file) print("\n[resourcemanager]", file=hosts_file) for rm_host in config.get_service_hostnames("resourcemanager"): print(rm_host, file=hosts_file) if config.has_service("spark"): print( "\n[spark]\n{0}".format( config.get_service_hostnames("spark")[0] ), file=hosts_file, ) if config.has_service("mesosmaster"): print( "\n[mesosmaster]\n{0}".format( config.get_service_hostnames("mesosmaster")[0] ), file=hosts_file, ) if config.has_service("metrics"): print( "\n[metrics]\n{0}".format( config.get_service_hostnames("metrics")[0] ), file=hosts_file, ) if config.has_service("swarmmanager"): print( "\n[swarmmanager]\n{0}".format( config.get_service_hostnames("swarmmanager")[0] ), file=hosts_file, ) if config.has_service("elkserver"): print( "\n[elkserver]\n{0}".format( config.get_service_hostnames("elkserver")[0] ), file=hosts_file, ) print("\n[zookeepers]", file=hosts_file) for (index, zk_host) in enumerate( config.get_service_hostnames("zookeeper"), start=1 ): print("{0} id={1}".format(zk_host, index), file=hosts_file) if config.has_service("fluo"): print("\n[fluo]", file=hosts_file) for host in config.get_service_hostnames("fluo"): print(host, file=hosts_file) if config.has_service("fluo_yarn"): print("\n[fluo_yarn]", file=hosts_file) for host in config.get_service_hostnames("fluo_yarn"): print(host, file=hosts_file) print("\n[workers]", file=hosts_file) for worker_host in config.get_service_hostnames("worker"): print(worker_host, file=hosts_file) print( "\n[accumulo:children]\naccumulomaster\nworkers", file=hosts_file, ) print( "\n[hadoop:children]\nnamenode\nresourcemanager" "\nworkers\nzkfc\njournalnode", file=hosts_file, ) print("\n[nodes]", file=hosts_file) for (private_ip, hostname) in config.get_private_ip_hostnames(): print( "{0} ansible_ssh_host={1} node_type={2}".format( hostname, private_ip, config.node_type(hostname) ), file=hosts_file, ) # Call the method for Azure cluster type to write additional # specialized configs into the Ansible hosts file. if config.cluster_type == "azure": self.add_specialized_configs(hosts_file) print("\n[all:vars]", file=hosts_file) for (name, value) in sorted(host_vars.items()): print("{0} = {1}".format(name, value), file=hosts_file) with open( path.join(config.deploy_path, "ansible/group_vars/all"), "w" ) as play_vars_file: for (name, value) in sorted(play_vars.items()): print("{0}: {1}".format(name, value), file=play_vars_file) # copy keys file to ansible/conf (if it exists) conf_keys = path.join(config.deploy_path, "conf/keys") ansible_keys = path.join(ansible_conf, "keys") if path.isfile(conf_keys): shutil.copyfile(conf_keys, ansible_keys) else: open(ansible_keys, "w").close() cmd = "rsync -az --delete -e \"ssh -o 'StrictHostKeyChecking no'\"" subprocess.call( "{cmd} {src} {usr}@{ldr}:{tdir}".format( cmd=cmd, src=path.join(config.deploy_path, "ansible"), usr=config.get("general", "cluster_user"), ldr=config.get_proxy_ip(), tdir=config.user_home(), ), shell=True, ) self.exec_on_proxy_verified( "{0}/ansible/scripts/install_ansible.sh".format( config.user_home() ), opts="-t", ) def setup(self): config = self.config print("Setting up {0} cluster".format(config.cluster_name)) self.sync() conf_upload = path.join(config.deploy_path, "conf/upload") cluster_tarballs = "{0}/tarballs".format(config.user_home()) self.exec_on_proxy_verified("mkdir -p {0}".format(cluster_tarballs)) for f in listdir(conf_upload): tarball_path = path.join(conf_upload, f) if path.isfile(tarball_path) and tarball_path.endswith("gz"): self.send_to_proxy(tarball_path, cluster_tarballs) self.execute_playbook("site.yml") @staticmethod def status(): exit( "ERROR - 'status' command cannot be used " "when cluster_type is set to 'existing'" ) @staticmethod def terminate(): exit( "ERROR - 'terminate' command cannot be used " "when cluster_type is set to 'existing'" ) @staticmethod def stop(): exit( "ERROR - 'stop' command cannot be used " "when cluster_type is set to 'existing'" ) @staticmethod def start(): exit( "ERROR - 'start' command cannot be used " "when cluster_type is set to 'existing'" ) def ssh(self): self.wait_until_proxy_ready() fwd = "" if self.config.has_option("general", "proxy_socks_port"): fwd = "-D " + self.config.get("general", "proxy_socks_port") ssh_command = ( "ssh -C -A -o 'StrictHostKeyChecking no' " "{fwd} {usr}@{ldr}" ).format( usr=self.config.get("general", "cluster_user"), ldr=self.config.get_proxy_ip(), fwd=fwd, ) print("Logging into proxy using: {0}".format(ssh_command)) retcode = subprocess.call(ssh_command, shell=True) if retcode != 0: exit( "ERROR - Command failed with return code of {0}: {1}".format( retcode, ssh_command ) ) def exec_on_proxy(self, command, opts=""): ssh_command = ( "ssh -A -o 'StrictHostKeyChecking no' " "{opts} {usr}@{ldr} '{cmd}'" ).format( usr=self.config.get("general", "cluster_user"), ldr=self.config.get_proxy_ip(), cmd=command, opts=opts, ) return subprocess.call(ssh_command, shell=True), ssh_command def exec_on_proxy_verified(self, command, opts=""): (retcode, ssh_command) = self.exec_on_proxy(command, opts) if retcode != 0: exit( "ERROR - Command failed with return code of {0}: {1}".format( retcode, ssh_command ) ) def wait_until_proxy_ready(self): cluster_user = self.config.get("general", "cluster_user") print( "Checking if '{0}' proxy can be reached using: " "ssh {1}@{2}".format( self.config.proxy_hostname(), cluster_user, self.config.get_proxy_ip(), ) ) while True: (retcode, ssh_command) = self.exec_on_proxy("pwd > /dev/null") if retcode == 0: print("Connected to proxy using SSH!") time.sleep(1) break print( "Proxy could not be accessed using SSH. " "Will retry in 5 sec..." ) time.sleep(5) def execute_playbook(self, playbook): print("Executing '{0}' playbook".format(playbook)) azure_proxy_host = self.config.get("azure", "azure_proxy_host") var_azure_proxy_host = ( "_" if (azure_proxy_host is None or azure_proxy_host.strip() == "") else azure_proxy_host ) self.exec_on_proxy_verified( "time -p ansible-playbook {base}/ansible/{playbook} " "--extra-vars 'azure_proxy_host={var_azure_proxy_host}'".format( base=self.config.user_home(), playbook=playbook, var_azure_proxy_host=var_azure_proxy_host, ), opts="-t", ) def send_to_proxy(self, path, target, skip_if_exists=True): print("Copying to proxy: ", path) cmd = "scp -o 'StrictHostKeyChecking no'" if skip_if_exists: cmd = ( 'rsync --update --progress -e "ssh -q -o ' "'StrictHostKeyChecking no'\"" ) subprocess.call( "{cmd} {src} {usr}@{ldr}:{tdir}".format( cmd=cmd, src=path, usr=self.config.get("general", "cluster_user"), ldr=self.config.get_proxy_ip(), tdir=target, ), shell=True, ) def wipe(self): if not path.isfile(self.config.hosts_path): exit( "Hosts file does not exist for cluster: " + self.config.hosts_path ) print( "Killing all processes started by Muchos and " "wiping Muchos data from {0} cluster".format( self.config.cluster_name ) ) self.execute_playbook("wipe.yml") def perform(self, action): if action == "launch": self.launch() elif action == "status": self.status() elif action == "sync": self.sync() elif action == "setup": self.setup() elif action == "start": self.start() elif action == "stop": self.stop() elif action == "ssh": self.ssh() elif action == "wipe": self.wipe() elif action in ("kill", "cancel_shutdown"): if not path.isfile(self.config.hosts_path): exit( "Hosts file does not exist for cluster: " + self.config.hosts_path ) elif action == "kill": print( "Killing all processes started by Muchos " "on {0} cluster".format(self.config.cluster_name) ) elif action == "cancel_shutdown": print( "Cancelling automatic shutdown of {0} cluster".format( self.config.cluster_name ) ) self.execute_playbook(action + ".yml") elif action == "terminate": self.terminate() else: print("ERROR - Unknown action:", action)