ambari-server/src/main/python/bootstrap.py (437 lines of code) (raw):

#!/usr/bin/env python ''' Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' import socket from threading import Thread import time import sys import logging import pprint import os import subprocess import threading import traceback from pprint import pformat AMBARI_PASSPHRASE_VAR_NAME = "AMBARI_PASSPHRASE" HOST_BOOTSTRAP_TIMEOUT = 300 # how many parallel bootstraps may be run at a time MAX_PARALLEL_BOOTSTRAPS = 20 # How many seconds to wait between polling parallel bootstraps POLL_INTERVAL_SEC = 1 DEBUG=False class HostLog: """ Provides per-host logging. """ def __init__(self, log_file): self.log_file = log_file def write(self, log_text): """ Writes log to file. Closes file after each write to make content accessible for poller in ambari-server """ logFile = open(self.log_file, "a+") text = str(log_text) if not text.endswith("\n"): text += "\n" logFile.write(text) logFile.close() class SCP: """ SCP implementation that is thread based. The status can be returned using status val """ def __init__(self, user, sshkey_file, host, inputFile, remote, bootdir, host_log): self.user = user self.sshkey_file = sshkey_file self.host = host self.inputFile = inputFile self.remote = remote self.bootdir = bootdir self.host_log = host_log pass def run(self): scpcommand = ["scp", "-o", "ConnectTimeout=60", "-o", "BatchMode=yes", "-o", "StrictHostKeyChecking=no", "-i", self.sshkey_file, self.inputFile, self.user + "@" + self.host + ":" + self.remote] if DEBUG: self.host_log.write("Running scp command " + ' '.join(scpcommand)) scpstat = subprocess.Popen(scpcommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE) log = scpstat.communicate() log = "STDOUT\n" + log[0] + "\nSTDERR\n" + log[1] self.host_log.write(log) self.host_log.write("scp " + self.inputFile + " done for host " + self.host + ", exitcode=" + str(scpstat.returncode)) return scpstat.returncode class SSH: """ Ssh implementation of this """ def __init__(self, user, sshkey_file, host, command, bootdir, host_log, errorMessage = None): self.user = user self.sshkey_file = sshkey_file self.host = host self.command = command self.bootdir = bootdir self.errorMessage = errorMessage self.host_log = host_log pass def run(self): sshcommand = ["ssh", "-o", "ConnectTimeOut=60", "-o", "StrictHostKeyChecking=no", "-o", "BatchMode=yes", "-tt", # Should prevent "tput: No value for $TERM and no -T specified" warning "-i", self.sshkey_file, self.user + "@" + self.host, self.command] if DEBUG: self.host_log.write("Running ssh command " + ' '.join(sshcommand)) sshstat = subprocess.Popen(sshcommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE) log = sshstat.communicate() errorMsg = log[1] if self.errorMessage and sshstat.returncode != 0: errorMsg = self.errorMessage + "\n" + errorMsg log = "STDOUT\n" + log[0] + "\nSTDERR\n" + errorMsg self.host_log.write(log) self.host_log.write("SSH command execution finished for host " + self.host + ", exitcode=" + str(sshstat.returncode)) return sshstat.returncode class Bootstrap(threading.Thread): """ Bootstrap the agent on a separate host""" TEMP_FOLDER = "/tmp" OS_CHECK_SCRIPT_FILENAME = "os_type_check.sh" AMBARI_REPO_FILENAME = "ambari.repo" SETUP_SCRIPT_FILENAME = "setupAgent.py" PASSWORD_FILENAME = "host_pass" def __init__(self, host, shared_state): threading.Thread.__init__(self) self.host = host self.shared_state = shared_state self.status = { "start_time": None, "return_code": None, } log_file = os.path.join(self.shared_state.bootdir, self.host + ".log") self.host_log = HostLog(log_file) self.daemon = True def getRemoteName(self, filename): full_name = os.path.join(self.TEMP_FOLDER, filename) remote_files = self.shared_state.remote_files if not remote_files.has_key(full_name): remote_files[full_name] = self.generateRandomFileName(full_name) return remote_files[full_name] def generateRandomFileName(self, filename): if filename is None: return self.getUtime() else: name, ext = os.path.splitext(filename) return str(name) + str(self.getUtime()) + str(ext) # This method is needed to implement the descriptor protocol (make object # to pass self reference to mockups) def __get__(self, obj, objtype): def _call(*args, **kwargs): self(obj, *args, **kwargs) return _call def is_suse(self): if os.path.isfile("/etc/issue"): if "suse" in open("/etc/issue").read().lower(): return True return False def getRepoDir(self): """ Ambari repo file for Ambari.""" if self.is_suse(): return "/etc/zypp/repos.d" else: return "/etc/yum.repos.d" def getRepoFile(self): """ Ambari repo file for Ambari.""" return os.path.join(self.getRepoDir(), self.AMBARI_REPO_FILENAME) def getOsCheckScript(self): return os.path.join(self.shared_state.script_dir, self.OS_CHECK_SCRIPT_FILENAME) def getOsCheckScriptRemoteLocation(self): return self.getRemoteName(self.OS_CHECK_SCRIPT_FILENAME) def getUtime(self): return int(time.time()) def getPasswordFile(self): return self.getRemoteName(self.PASSWORD_FILENAME) def hasPassword(self): password_file = self.shared_state.password_file return password_file is not None and password_file != 'null' def copyOsCheckScript(self): # Copying the os check script file fileToCopy = self.getOsCheckScript() target = self.getOsCheckScriptRemoteLocation() params = self.shared_state scp = SCP(params.user, params.sshkey_file, self.host, fileToCopy, target, params.bootdir, self.host_log) result = scp.run() self.host_log.write("Copying os type check script finished") return result def getMoveRepoFileWithPasswordCommand(self, targetDir): return "sudo -S mv " + str(self.getRemoteName(self.AMBARI_REPO_FILENAME)) \ + " " + os.path.join(str(targetDir), self.AMBARI_REPO_FILENAME) + \ " < " + str(self.getPasswordFile()) def getMoveRepoFileWithoutPasswordCommand(self, targetDir): return "sudo mv " + str(self.getRemoteName(self.AMBARI_REPO_FILENAME)) \ + " " + os.path.join(str(targetDir), self.AMBARI_REPO_FILENAME) def getMoveRepoFileCommand(self, targetDir): if self.hasPassword(): return self.getMoveRepoFileWithPasswordCommand(targetDir) else: return self.getMoveRepoFileWithoutPasswordCommand(targetDir) def copyNeededFiles(self): # Copying the files fileToCopy = self.getRepoFile() target = self.getRemoteName(self.AMBARI_REPO_FILENAME) self.host_log.write("Copying repo file to 'tmp' folder...") params = self.shared_state scp = SCP(params.user, params.sshkey_file, self.host, fileToCopy, target, params.bootdir, self.host_log) retcode1 = scp.run() # Move file to repo dir self.host_log.write("Moving file to repo dir...") targetDir = self.getRepoDir() command = self.getMoveRepoFileCommand(targetDir) ssh = SSH(params.user, params.sshkey_file, self.host, command, params.bootdir, self.host_log) retcode2 = ssh.run() self.host_log.write("Copying setup script file...") fileToCopy = params.setup_agent_file target = self.getRemoteName(self.SETUP_SCRIPT_FILENAME) scp = SCP(params.user, params.sshkey_file, self.host, fileToCopy, target, params.bootdir, self.host_log) retcode3 = scp.run() self.host_log.write("Copying files finished") return max(retcode1, retcode2, retcode3) def getAmbariVersion(self): ambari_version = self.shared_state.ambari_version if ambari_version is None or ambari_version == "null": return "" else: return ambari_version def getAmbariPort(self): server_port = self.shared_state.server_port if server_port is None or server_port == "null": return "null" else: return server_port def getRunSetupWithPasswordCommand(self, expected_hostname): setupFile = self.getRemoteName(self.SETUP_SCRIPT_FILENAME) passphrase = os.environ[AMBARI_PASSPHRASE_VAR_NAME] server = self.shared_state.ambari_server version = self.getAmbariVersion() port = self.getAmbariPort() passwordFile = self.getPasswordFile() return "sudo -S python " + str(setupFile) + " " + str(expected_hostname) + \ " " + str(passphrase) + " " + str(server) + " " + str(version) + \ " " + str(port) + " < " + str(passwordFile) def getRunSetupWithoutPasswordCommand(self, expected_hostname): setupFile=self.getRemoteName(self.SETUP_SCRIPT_FILENAME) passphrase=os.environ[AMBARI_PASSPHRASE_VAR_NAME] server=self.shared_state.ambari_server version=self.getAmbariVersion() port=self.getAmbariPort() return "sudo python " + str(setupFile) + " " + str(expected_hostname) + \ " " + str(passphrase) + " " + str(server) + " " + str(version) + \ " " + str(port) def getRunSetupCommand(self, expected_hostname): if self.hasPassword(): return self.getRunSetupWithPasswordCommand(expected_hostname) else: return self.getRunSetupWithoutPasswordCommand(expected_hostname) def runOsCheckScript(self): params = self.shared_state self.host_log.write("Running os type check...") command = "chmod a+x %s && %s %s" % \ (self.getOsCheckScriptRemoteLocation(), self.getOsCheckScriptRemoteLocation(), params.cluster_os_type) ssh = SSH(params.user, params.sshkey_file, self.host, command, params.bootdir, self.host_log) retcode = ssh.run() self.host_log.write("Running os type check finished") return retcode def runSetupAgent(self): params = self.shared_state self.host_log.write("Running setup agent...") command = self.getRunSetupCommand(self.host) ssh = SSH(params.user, params.sshkey_file, self.host, command, params.bootdir, self.host_log) retcode = ssh.run() self.host_log.write("Setting up agent finished") return retcode def createDoneFile(self, retcode): """ Creates .done file for current host. These files are later read from Java code. If .done file for any host is not created, the bootstrap will hang or fail due to timeout""" params = self.shared_state doneFilePath = os.path.join(params.bootdir, self.host + ".done") if not os.path.exists(doneFilePath): doneFile = open(doneFilePath, "w+") doneFile.write(str(retcode)) doneFile.close() def checkSudoPackage(self): """ Checking 'sudo' package on remote host """ params = self.shared_state command = "rpm -qa | grep sudo" ssh = SSH(params.user, params.sshkey_file, self.host, command, params.bootdir, self.host_log, errorMessage="Error: Sudo command is not available. " \ "Please install the sudo command.") retcode = ssh.run() self.host_log.write("Checking 'sudo' package finished") return retcode def copyPasswordFile(self): # Copy the password file self.host_log.write("Copying password file to 'tmp' folder...") params = self.shared_state scp = SCP(params.user, params.sshkey_file, self.host, params.password_file, self.getPasswordFile(), params.bootdir, self.host_log) retcode1 = scp.run() self.copied_password_file = True # Change password file mode to 600 self.host_log.write("Changing password file mode...") command = "chmod 600 " + self.getPasswordFile() ssh = SSH(params.user, params.sshkey_file, self.host, command, params.bootdir, self.host_log) retcode2 = ssh.run() self.host_log.write("Copying password file finished") return max(retcode1, retcode2) def changePasswordFileModeOnHost(self): # Change password file mode to 600 self.host_log.write("Changing password file mode...") params = self.shared_state command = "chmod 600 " + self.getPasswordFile() ssh = SSH(params.user, params.sshkey_file, self.host, command, params.bootdir, self.host_log) retcode = ssh.run() self.host_log.write("Change password file mode on host finished") return retcode def deletePasswordFile(self): # Deleting the password file self.host_log.write("Deleting password file...") params = self.shared_state command = "rm " + self.getPasswordFile() ssh = SSH(params.user, params.sshkey_file, self.host, command, params.bootdir, self.host_log) retcode = ssh.run() self.host_log.write("Deleting password file finished") return retcode def try_to_execute(self, action): try: last_retcode = action() except Exception, e: self.host_log.write("Traceback: " + traceback.format_exc()) last_retcode = 177 return last_retcode def run(self): """ Copy files and run commands on remote host """ self.status["start_time"] = time.time() # Population of action queue action_queue = [self.copyOsCheckScript, self.runOsCheckScript, self.checkSudoPackage ] if self.hasPassword(): action_queue.extend([self.copyPasswordFile, self.changePasswordFileModeOnHost]) action_queue.extend([ self.copyNeededFiles, self.runSetupAgent, ]) # Execution of action queue last_retcode = 0 while action_queue and last_retcode == 0: action = action_queue.pop(0) last_retcode = self.try_to_execute(action) # Checking execution result if last_retcode != 0: message = "ERROR: Bootstrap of host {0} fails because previous action " \ "finished with non-zero exit code ({1})".format(self.host, last_retcode) self.host_log.write(message) logging.error(message) # Try to delete password file if self.hasPassword() and self.copied_password_file: retcode = self.try_to_execute(self.deletePasswordFile) if retcode != 0: message = "WARNING: failed to delete password file " \ "at {0}. Please delete it manually".format(self.getPasswordFile()) self.host_log.write(message) logging.warn(message) self.createDoneFile(last_retcode) self.status["return_code"] = last_retcode def getStatus(self): return self.status def interruptBootstrap(self): """ Thread is not really interrupted (moreover, Python seems to have no any stable/portable/official api to do that: _Thread__stop only marks thread as stopped). The bootstrap thread is marked as a daemon at init, and will exit when the main parallel bootstrap thread exits. All we need to do now is a proper logging and creating .done file """ self.host_log.write("Bootstrap timed out") self.createDoneFile(199) class PBootstrap: """ BootStrapping the agents on a list of hosts""" def __init__(self, hosts, sharedState): self.hostlist = hosts self.sharedState = sharedState pass def run_bootstrap(self, host): bootstrap = Bootstrap(host, self.sharedState) bootstrap.start() return bootstrap def run(self): """ Run up to MAX_PARALLEL_BOOTSTRAPS at a time in parallel """ logging.info("Executing parallel bootstrap") queue = list(self.hostlist) queue.reverse() running_list = [] finished_list = [] while queue or running_list: # until queue is not empty or not all parallel bootstraps are # poll running bootstraps for bootstrap in running_list: if bootstrap.getStatus()["return_code"] is not None: finished_list.append(bootstrap) else: starttime = bootstrap.getStatus()["start_time"] elapsedtime = time.time() - starttime if elapsedtime > HOST_BOOTSTRAP_TIMEOUT: # bootstrap timed out logging.warn("Bootstrap at host {0} timed out and will be " "interrupted".format(bootstrap.host)) bootstrap.interruptBootstrap() finished_list.append(bootstrap) # Remove finished from the running list running_list[:] = [b for b in running_list if not b in finished_list] # Start new bootstraps from the queue free_slots = MAX_PARALLEL_BOOTSTRAPS - len(running_list) for i in range(free_slots): if queue: next_host = queue.pop() bootstrap = self.run_bootstrap(next_host) running_list.append(bootstrap) time.sleep(POLL_INTERVAL_SEC) logging.info("Finished parallel bootstrap") class SharedState: def __init__(self, user, sshkey_file, script_dir, boottmpdir, setup_agent_file, ambari_server, cluster_os_type, ambari_version, server_port, password_file = None): self.hostlist_to_remove_password_file = None self.user = user self.sshkey_file = sshkey_file self.bootdir = boottmpdir self.script_dir = script_dir self.setup_agent_file = setup_agent_file self.ambari_server = ambari_server self.cluster_os_type = cluster_os_type self.ambari_version = ambari_version self.password_file = password_file self.statuses = None self.server_port = server_port self.remote_files = {} self.ret = {} pass def main(argv=None): scriptDir = os.path.realpath(os.path.dirname(argv[0])) onlyargs = argv[1:] if len(onlyargs) < 3: sys.stderr.write("Usage: <comma separated hosts> " "<tmpdir for storage> <user> <sshkey_file> <agent setup script>" " <ambari-server name> <cluster os type> <ambari version> <ambari port> <passwordFile>\n") sys.exit(2) pass #Parse the input hostList = onlyargs[0].split(",") bootdir = onlyargs[1] user = onlyargs[2] sshkey_file = onlyargs[3] setupAgentFile = onlyargs[4] ambariServer = onlyargs[5] cluster_os_type = onlyargs[6] ambariVersion = onlyargs[7] server_port = onlyargs[8] passwordFile = onlyargs[9] # ssh doesn't like open files subprocess.Popen(["chmod", "600", sshkey_file], stdout=subprocess.PIPE) if passwordFile is not None and passwordFile != 'null': subprocess.Popen(["chmod", "600", passwordFile], stdout=subprocess.PIPE) logging.info("BootStrapping hosts " + pprint.pformat(hostList) + " using " + scriptDir + " cluster primary OS: " + cluster_os_type + " with user '" + user + "' sshKey File " + sshkey_file + " password File " + passwordFile +\ " using tmp dir " + bootdir + " ambari: " + ambariServer +"; server_port: " + server_port +\ "; ambari version: " + ambariVersion) sharedState = SharedState(user, sshkey_file, scriptDir, bootdir, setupAgentFile, ambariServer, cluster_os_type, ambariVersion, server_port, passwordFile) pbootstrap = PBootstrap(hostList, sharedState) pbootstrap.run() return 0 # Hack to comply with current usage if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) main(sys.argv)