in manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java [110:180]
public void startOperation(long resourceClusterId, long requestId) throws Exception {
log.info("start resource cluster {} all nodes agent", resourceClusterId);
ResourceClusterEntity clusterEntity = resourceClusterRepository.findById(resourceClusterId).get();
PMResourceClusterAccessInfo accessInfo = JSON.parseObject(clusterEntity.getAccessInfo(),
PMResourceClusterAccessInfo.class);
// TODO:The path can be set separately for each machine later
List<ResourceNodeEntity> nodeEntities = nodeRepository.getByResourceClusterId(resourceClusterId);
log.debug("check agent port for resource cluster {} all nodes", resourceClusterId);
// before install and start agent, to check whether port is available or not,
// it can not guarantee the port must not be used when starting the agent,
// but it may expose this problem early if the port has been used.
List<Pair<ResourceNodeEntity, CompletableFuture<Boolean>>> nodeFutures = new ArrayList<>();
for (ResourceNodeEntity nodeEntity : nodeEntities) {
AgentInstallEventConfigInfo configInfo = new AgentInstallEventConfigInfo();
configInfo.setSshUser(accessInfo.getSshUser());
configInfo.setSshPort(accessInfo.getSshPort());
configInfo.setSshKey(accessInfo.getSshKey());
CompletableFuture<Boolean> portCheckFuture = CompletableFuture.supplyAsync(() -> {
try {
nodeAndAgentManager.checkSshConnect(nodeEntity, configInfo);
return nodeAndAgentManager.isAvailableAgentPort(nodeEntity, configInfo);
} catch (Exception e) {
log.error("check node {} exception: {}", nodeEntity.getHost(), e.getMessage());
throw new CompletionException(e);
}
});
nodeFutures.add(Pair.of(nodeEntity, portCheckFuture));
}
boolean checkFailed = false;
StringBuilder exStrBuilder = new StringBuilder();
for (Pair<ResourceNodeEntity, CompletableFuture<Boolean>> nodeFuture: nodeFutures) {
ResourceNodeEntity nodeEntity = nodeFuture.getLeft();
CompletableFuture<Boolean> future = nodeFuture.getRight();
try {
boolean isAvailablePort = future.get();
if (!isAvailablePort) {
checkFailed = true;
log.error("node {}:{} port already in use", nodeEntity.getHost(), nodeEntity.getAgentPort());
throw new Exception(String.format("node %s:%d port already in use",
nodeEntity.getHost(), nodeEntity.getAgentPort()));
}
} catch (Exception e) {
checkFailed = true;
log.error("node {}:{} check exception {}", nodeEntity.getHost(), nodeEntity.getAgentPort(), e);
exStrBuilder.append(String.format("%s:%d, %s",
nodeEntity.getHost(), nodeEntity.getAgentPort(), e));
exStrBuilder.append("\n");
}
}
if (checkFailed) {
log.error("check node exception list: {}\n", exStrBuilder);
throw new Exception(exStrBuilder.toString());
}
log.debug("install agent for resource cluster {} all nodes", resourceClusterId);
for (ResourceNodeEntity nodeEntity : nodeEntities) {
log.info("start to install agent to {} node {}", nodeEntity.getId(), nodeEntity.getHost());
AgentInstallEventConfigInfo configInfo = new AgentInstallEventConfigInfo();
configInfo.setSshUser(accessInfo.getSshUser());
configInfo.setSshPort(accessInfo.getSshPort());
configInfo.setSshKey(accessInfo.getSshKey());
nodeAndAgentManager.installAgentOperation(nodeEntity, configInfo, requestId);
}
}