in ambari-server/src/main/java/org/apache/ambari/server/bootstrap/BSRunner.java [150:325]
public void run() {
String hostString = createHostString(sshHostInfo.getHosts());
String user = sshHostInfo.getUser();
if (user == null || user.isEmpty()) {
user = DEFAULT_USER;
}
String commands[] = new String[11];
String shellCommand[] = new String[3];
BSStat stat = BSStat.RUNNING;
String scriptlog = "";
try {
createRunDir();
if (LOG.isDebugEnabled()) {
// FIXME needs to be removed later
// security hole
LOG.debug("Using ssh key=\""
+ sshHostInfo.getSshKey() + "\"");
}
String password = sshHostInfo.getPassword();
if (password != null && !password.isEmpty()) {
this.passwordFile = new File(this.requestIdDir, "host_pass");
// TODO : line separator should be changed
// if we are going to support multi platform server-agent solution
String lineSeparator = System.getProperty("line.separator");
password = password + lineSeparator;
writePasswordFile(password);
}
writeSshKeyFile(sshHostInfo.getSshKey());
/* Running command:
* script hostlist bsdir user sshkeyfile
*/
shellCommand[0] = "sh";
shellCommand[1] = "-c";
commands[0] = this.bsScript;
commands[1] = hostString;
commands[2] = this.requestIdDir.toString();
commands[3] = user;
commands[4] = this.sshKeyFile.toString();
commands[5] = this.agentSetupScript.toString();
commands[6] = this.ambariHostname;
commands[7] = this.clusterOsType;
commands[8] = this.projectVersion;
commands[9] = this.serverPort+"";
if (this.passwordFile != null) {
commands[10] = this.passwordFile.toString();
}
LOG.info("Host= " + hostString + " bs=" + this.bsScript + " requestDir=" +
requestIdDir + " user=" + user + " keyfile=" + this.sshKeyFile +
" passwordFile " + this.passwordFile + " server=" + this.ambariHostname +
" version=" + projectVersion + " serverPort=" + this.serverPort);
String[] env = new String[] { "AMBARI_PASSPHRASE=" + agentSetupPassword };
if (this.verbose)
env = new String[] { env[0], " BS_VERBOSE=\"-vvv\" " };
StringBuilder commandString = new StringBuilder();
for (String comm : commands) {
commandString.append(" " + comm);
}
if (LOG.isDebugEnabled()) {
LOG.debug(commandString);
}
String bootStrapOutputFile = requestIdDir + File.separator + "bootstrap.out";
String bootStrapErrorFile = requestIdDir + File.separator + "bootstrap.err";
commandString.append(
" 1> " + bootStrapOutputFile + " 2>" + bootStrapErrorFile);
shellCommand[2] = commandString.toString();
Process process = Runtime.getRuntime().exec(shellCommand, env);
// Startup a scheduled executor service to look through the logs
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
BSStatusCollector statusCollector = new BSStatusCollector();
ScheduledFuture<?> handle = scheduler.scheduleWithFixedDelay(statusCollector,
0, 10, TimeUnit.SECONDS);
LOG.info("Kicking off the scheduler for polling on logs in " +
this.requestIdDir);
try {
LOG.info("Bootstrap output, log="
+ bootStrapErrorFile + " " + bootStrapOutputFile);
int exitCode = process.waitFor();
String outMesg = "";
String errMesg = "";
try {
outMesg = FileUtils.readFileToString(new File(bootStrapOutputFile));
errMesg = FileUtils.readFileToString(new File(bootStrapErrorFile));
} catch(IOException io) {
LOG.info("Error in reading files ", io);
}
scriptlog = outMesg + "\n\n" + errMesg;
LOG.info("Script log Mesg " + scriptlog);
if (exitCode != 0) {
stat = BSStat.ERROR;
} else {
stat = BSStat.SUCCESS;
}
scheduler.schedule(new BSStatusCollector(), 0, TimeUnit.SECONDS);
long startTime = System.currentTimeMillis();
while (true) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for hosts status to be updated");
}
boolean pendingHosts = false;
BootStrapStatus tmpStatus = bsImpl.getStatus(requestId);
for (BSHostStatus status : tmpStatus.getHostsStatus()) {
if (status.getStatus().equals("RUNNING")) {
pendingHosts = true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Whether hosts status yet to be updated, pending="
+ pendingHosts);
}
if (!pendingHosts) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue
}
long now = System.currentTimeMillis();
if (now >= (startTime+15000)) {
LOG.warn("Gave up waiting for hosts status to be updated");
break;
}
}
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
handle.cancel(true);
/* schedule a last update */
scheduler.schedule(new BSStatusCollector(), 0, TimeUnit.SECONDS);
scheduler.shutdownNow();
try {
scheduler.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info("Interruped while waiting for scheduler");
}
process.destroy();
}
} catch(IOException io) {
LOG.info("Error executing bootstrap " + io.getMessage());
stat = BSStat.ERROR;
}
finally {
/* get the bstatus */
BootStrapStatus tmpStatus = bsImpl.getStatus(requestId);
tmpStatus.setLog(scriptlog);
tmpStatus.setStatus(stat);
bsImpl.updateStatus(requestId, tmpStatus);
bsImpl.reset();
// Remove private ssh key after bootstrap is complete
try {
FileUtils.forceDelete(sshKeyFile);
} catch (IOException io) {
LOG.warn(io.getMessage());
}
if (passwordFile != null) {
// Remove password file after bootstrap is complete
try {
FileUtils.forceDelete(passwordFile);
} catch (IOException io) {
LOG.warn(io.getMessage());
}
}
finished();
}
}