in manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java [290:413]
private void installEventProcess(ResourceNodeEntity node, AgentInstallEventConfigInfo configInfo,
HeartBeatEventEntity agentInstallAgentEntity) {
if (!agentInstallAgentEntity.getType().equals(HeartBeatEventType.AGENT_INSTALL.name())) {
log.warn("agent has been installed on {} node {}", node.getId(), node.getHost());
return;
}
if (!agentInstallAgentEntity.getStatus().equals(HeartBeatEventResultType.INIT.name())) {
log.warn("agent is being installed on {} node {}", node.getId(), node.getHost());
return;
}
// AGENT_INSTALL heartbeat handle
// TODO: Before each Stage operation, it is necessary to judge whether the event has been cancelled
// ACCESS_AUTH stage
String sshkey = "sshkey-" + agentInstallAgentEntity.getId();
File sshKeyFile = SSH.buildSshKeyFile(sshkey);
SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
//check telnet
if (!TelnetUtil.telnet(configInfo.getHost(), configInfo.getSshPort())) {
log.error("can not telnet host {} port {}", configInfo.getHost(), configInfo.getSshPort());
// TODO:The result content will be defined later
updateFailResult(AgentInstallEventStage.ACCESS_AUTH.getError(), AgentInstallEventStage.ACCESS_AUTH.getStage(),
agentInstallAgentEntity);
return;
}
log.info("telnet host {} port {} success", configInfo.getHost(), configInfo.getSshPort());
//check ssh
SSH ssh = new SSH(configInfo.getSshUser(), configInfo.getSshPort(),
sshKeyFile.getAbsolutePath(), configInfo.getHost(), "echo ok");
if (!ssh.run()) {
log.error("ssh is not available: {}", ssh.getErrorResponse());
updateFailResult(AgentInstallEventStage.ACCESS_AUTH.getError(), AgentInstallEventStage.ACCESS_AUTH.getStage(),
agentInstallAgentEntity);
return;
}
log.info("ssh is available");
updateProcessingResult(AgentInstallEventStage.ACCESS_AUTH.getMessage(), AgentInstallEventStage.ACCESS_AUTH.getStage(),
agentInstallAgentEntity);
// check installDir exist
// INSTALL_DIR_CHECK stage
String checkFileExistCmd = "if test -e " + configInfo.getInstallDir() + "; then echo ok; else mkdir -p " + configInfo.getInstallDir() + " ;fi";
ssh.setCommand(checkFileExistCmd);
if (!ssh.run()) {
log.error("installation path is not available:{}", ssh.getErrorResponse());
updateFailResult(AgentInstallEventStage.INSTALL_DIR_CHECK.getError(),
AgentInstallEventStage.INSTALL_DIR_CHECK.getStage(), agentInstallAgentEntity);
return;
}
log.info("check installDir exist");
updateProcessingResult(AgentInstallEventStage.INSTALL_DIR_CHECK.getMessage(),
AgentInstallEventStage.INSTALL_DIR_CHECK.getStage(), agentInstallAgentEntity);
//check jdk
// JDK_CHECK stage
final String checkSystem = "cat /etc/os-release";
ssh.setCommand(checkSystem);
if (!ssh.run()) {
log.error("View system failures", ssh.getErrorResponse());
return;
}
if (ssh.getStdoutResponse().toLowerCase(Locale.ROOT).contains("ubuntu")) {
final String mkdirBashProfile = "if test -f ~/.bash_profile;then echo ok;else touch ~/.bash_profile;fi";
ssh.setCommand(mkdirBashProfile);
if (!ssh.run()) {
log.error("Create file fail", ssh.getErrorResponse());
return;
}
}
log.info("Create .bash_profile successfully");
final String checkJavaHome = "source /etc/profile && source ~/.bash_profile && java -version && echo $JAVA_HOME";
ssh.setCommand(checkJavaHome);
if (!ssh.run()) {
log.error("jdk is not available: {}", ssh.getErrorResponse());
updateFailResult(AgentInstallEventStage.JDK_CHECK.getError(),
AgentInstallEventStage.JDK_CHECK.getStage(), agentInstallAgentEntity);
return;
}
log.info("check jdk success");
updateProcessingResult(AgentInstallEventStage.JDK_CHECK.getMessage(),
AgentInstallEventStage.JDK_CHECK.getStage(), agentInstallAgentEntity);
// agent install
// AGENT_DEPLOY stage
// TODO: How to get the agent installation package
ApplicationHome applicationHome = new ApplicationHome();
String dorisManagerHome = applicationHome.getSource().getParentFile().getParentFile().getParentFile().toString();
log.info("doris manager home : {}", dorisManagerHome);
String agentPackageHome = dorisManagerHome + File.separator + "agent";
Preconditions.checkNotNull(configInfo.getHost(), "host is empty");
SCP scp = new SCP(configInfo.getSshUser(), configInfo.getSshPort(),
sshKeyFile.getAbsolutePath(), configInfo.getHost(), agentPackageHome, configInfo.getInstallDir());
if (!scp.run()) {
log.error("scp agent package failed:{} to {}", agentPackageHome, configInfo.getInstallDir());
updateFailResult(AgentInstallEventStage.AGENT_DEPLOY.getError(),
AgentInstallEventStage.AGENT_DEPLOY.getStage(), agentInstallAgentEntity);
return;
}
log.info("agent install success");
updateProcessingResult(AgentInstallEventStage.AGENT_DEPLOY.getMessage(),
AgentInstallEventStage.AGENT_DEPLOY.getStage(), agentInstallAgentEntity);
// agent start
// AGENT_START stage
String agentInstallHome = configInfo.getInstallDir() + File.separator + "agent";
log.info("to start agent with port {}", configInfo.getAgentPort());
String command = "cd %s && sh %s --server %s --agent %d --port %d";
String cmd = String.format(command, agentInstallHome, AGENT_START_SCRIPT,
getServerAddr(), configInfo.getAgentNodeId(), configInfo.getAgentPort());
SSH startSsh = new SSH(configInfo.getSshUser(), configInfo.getSshPort(),
sshKeyFile.getAbsolutePath(), configInfo.getHost(), cmd);
if (!startSsh.run()) {
log.error("agent start failed:{}", startSsh.getErrorResponse());
updateFailResult(AgentInstallEventStage.AGENT_START.getError(),
AgentInstallEventStage.AGENT_START.getStage(), agentInstallAgentEntity);
return;
}
log.info("agent start success");
updateProcessingResult(AgentInstallEventStage.AGENT_START.getMessage(),
AgentInstallEventStage.AGENT_START.getStage(), agentInstallAgentEntity);
}