in ambari-server/src/main/java/org/apache/ambari/server/bootstrap/BSRunner.java [176:411]
public void run() {
String hostString = createHostString(sshHostInfo.getHosts());
long bootstrapTimeout = calculateBSTimeout(sshHostInfo.getHosts().size());
// Startup a scheduled executor service to look through the logs
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
BSStatusCollector statusCollector = new BSStatusCollector();
ScheduledFuture<?> handle = null;
LOG.info("Kicking off the scheduler for polling on logs in " +
this.requestIdDir);
String user = sshHostInfo.getUser();
String userRunAs = sshHostInfo.getUserRunAs();
if (user == null || user.isEmpty()) {
user = DEFAULT_USER;
}
String sshPort = sshHostInfo.getSshPort();
if(sshPort == null || sshPort.isEmpty()){
sshPort = DEFAULT_SSHPORT;
}
String command[] = new String[13];
BSStat stat = BSStat.RUNNING;
String scriptlog = "";
try {
createRunDir();
handle = scheduler.scheduleWithFixedDelay(statusCollector,
0, 10, TimeUnit.SECONDS);
if (LOG.isDebugEnabled()) {
// FIXME needs to be removed later
// security hole
LOG.debug("Using ssh key=\"{}\"", sshHostInfo.getSshKey());
}
String password = sshHostInfo.getPassword();
if (password != null && !password.isEmpty()) {
this.passwordFile = new File(this.requestIdDir, "host_pass");
// TODO : line separator should be changed
// if we are going to support multi platform server-agent solution
String lineSeparator = System.getProperty("line.separator");
password = password + lineSeparator;
writePasswordFile(password);
}
writeSshKeyFile(sshHostInfo.getSshKey());
/* Running command:
* script hostlist bsdir user sshkeyfile
*/
command[0] = this.bsScript;
command[1] = hostString;
command[2] = this.requestIdDir.toString();
command[3] = user;
command[4] = sshPort;
command[5] = this.sshKeyFile.toString();
command[6] = this.agentSetupScript.toString();
command[7] = this.ambariHostname;
command[8] = this.clusterOsFamily;
command[9] = this.projectVersion;
command[10] = this.serverPort+"";
command[11] = userRunAs;
command[12] = (this.passwordFile==null) ? "null" : this.passwordFile.toString();
Map<String, String> envVariables = new HashMap<>();
if (System.getProperty("os.name").contains("Windows")) {
String command2[] = new String[command.length + 1];
command2[0] = "python";
System.arraycopy(command, 0, command2, 1, command.length);
command = command2;
Map<String, String> envVarsWin = System.getenv();
if (envVarsWin != null) {
envVariables.putAll(envVarsWin); //envVarsWin is non-modifiable
}
}
LOG.info("Host= " + hostString + " bs=" + this.bsScript + " requestDir=" +
requestIdDir + " user=" + user + " sshPort=" + sshPort + " keyfile=" + this.sshKeyFile +
" passwordFile " + this.passwordFile + " server=" + this.ambariHostname +
" version=" + projectVersion + " serverPort=" + this.serverPort + " userRunAs=" + userRunAs +
" timeout=" + bootstrapTimeout / 1000);
envVariables.put("AMBARI_PASSPHRASE", agentSetupPassword);
if (this.verbose)
envVariables.put("BS_VERBOSE", "\"-vvv\"");
if (LOG.isDebugEnabled()) {
LOG.debug(Arrays.toString(command));
}
String bootStrapOutputFilePath = requestIdDir + File.separator + "bootstrap.out";
String bootStrapErrorFilePath = requestIdDir + File.separator + "bootstrap.err";
ProcessBuilder pb = new ProcessBuilder(command);
pb.redirectOutput(new File(bootStrapOutputFilePath));
pb.redirectError(new File(bootStrapErrorFilePath));
Map<String, String> env = pb.environment();
env.putAll(envVariables);
Process process = pb.start();
try {
String logInfoMessage = "Bootstrap output, log="
+ bootStrapErrorFilePath + " " + bootStrapOutputFilePath + " at " + this.ambariHostname;
LOG.info(logInfoMessage);
int exitCode = 1;
boolean timedOut = false;
if (waitForProcessTermination(process, bootstrapTimeout)){
exitCode = process.exitValue();
} else {
LOG.warn("Bootstrap process timed out. It will be destroyed.");
process.destroy();
timedOut = true;
}
String outMesg = "";
String errMesg = "";
try {
outMesg = FileUtils
.readFileToString(new File(bootStrapOutputFilePath), Charset.defaultCharset());
errMesg = FileUtils
.readFileToString(new File(bootStrapErrorFilePath), Charset.defaultCharset());
} catch(IOException io) {
LOG.info("Error in reading files ", io);
}
scriptlog = outMesg + "\n\n" + errMesg;
if (timedOut) {
scriptlog += "\n\n Bootstrap process timed out. It was destroyed.";
}
LOG.info("Script log Mesg " + scriptlog);
if (exitCode != 0) {
stat = BSStat.ERROR;
interuptSetupAgent(99, scriptlog);
} else {
stat = BSStat.SUCCESS;
}
scheduler.schedule(new BSStatusCollector(), 0, TimeUnit.SECONDS);
long startTime = System.currentTimeMillis();
while (true) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for hosts status to be updated");
}
boolean pendingHosts = false;
BootStrapStatus tmpStatus = bsImpl.getStatus(requestId);
List <BSHostStatus> hostStatusList = tmpStatus.getHostsStatus();
if (hostStatusList != null) {
for (BSHostStatus status : hostStatusList) {
if (status.getStatus().equals("RUNNING")) {
pendingHosts = true;
}
}
} else {
//Failed to get host status, waiting for hosts status to be updated
pendingHosts = true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Whether hosts status yet to be updated, pending={}", pendingHosts);
}
if (!pendingHosts) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue
}
long now = System.currentTimeMillis();
if (now >= (startTime+15000)) {
LOG.warn("Gave up waiting for hosts status to be updated");
break;
}
}
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
if (handle != null) {
handle.cancel(true);
}
/* schedule a last update */
scheduler.schedule(new BSStatusCollector(), 0, TimeUnit.SECONDS);
scheduler.shutdownNow();
try {
scheduler.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info("Interruped while waiting for scheduler");
}
process.destroy();
}
} catch(IOException io) {
LOG.info("Error executing bootstrap " + io.getMessage());
stat = BSStat.ERROR;
interuptSetupAgent(99, io.getMessage());
}
finally {
/* get the bstatus */
BootStrapStatus tmpStatus = bsImpl.getStatus(requestId);
List <BSHostStatus> hostStatusList = tmpStatus.getHostsStatus();
if (hostStatusList != null) {
for (BSHostStatus hostStatus : hostStatusList) {
if ("FAILED".equals(hostStatus.getStatus())) {
stat = BSStat.ERROR;
break;
}
}
} else {
stat = BSStat.ERROR;
}
// creating new status instance to avoid modifying exposed object
BootStrapStatus newStat = new BootStrapStatus();
newStat.setHostsStatus(hostStatusList);
newStat.setLog(scriptlog);
newStat.setStatus(stat);
// Remove private ssh key after bootstrap is complete
try {
FileUtils.forceDelete(sshKeyFile);
} catch (IOException io) {
LOG.warn(io.getMessage());
}
if (passwordFile != null) {
// Remove password file after bootstrap is complete
try {
FileUtils.forceDelete(passwordFile);
} catch (IOException io) {
LOG.warn(io.getMessage());
}
}
bsImpl.updateStatus(requestId, newStat);
bsImpl.reset();
finished();
}
}