public void startOperation()

in manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java [110:180]


    public void startOperation(long resourceClusterId, long requestId) throws Exception {

        log.info("start resource cluster {} all nodes agent", resourceClusterId);
        ResourceClusterEntity clusterEntity = resourceClusterRepository.findById(resourceClusterId).get();
        PMResourceClusterAccessInfo accessInfo = JSON.parseObject(clusterEntity.getAccessInfo(),
                PMResourceClusterAccessInfo.class);
        // TODO:The path can be set separately for each machine later
        List<ResourceNodeEntity> nodeEntities = nodeRepository.getByResourceClusterId(resourceClusterId);

        log.debug("check agent port for resource cluster {} all nodes", resourceClusterId);

        // before install and start agent, to check whether port is available or not,
        // it can not guarantee the port must not be used when starting the agent,
        // but it may expose this problem early if the port has been used.
        List<Pair<ResourceNodeEntity, CompletableFuture<Boolean>>> nodeFutures = new ArrayList<>();
        for (ResourceNodeEntity nodeEntity : nodeEntities) {
            AgentInstallEventConfigInfo configInfo = new AgentInstallEventConfigInfo();
            configInfo.setSshUser(accessInfo.getSshUser());
            configInfo.setSshPort(accessInfo.getSshPort());
            configInfo.setSshKey(accessInfo.getSshKey());

            CompletableFuture<Boolean> portCheckFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    nodeAndAgentManager.checkSshConnect(nodeEntity, configInfo);
                    return nodeAndAgentManager.isAvailableAgentPort(nodeEntity, configInfo);
                } catch (Exception e) {
                    log.error("check node {} exception: {}", nodeEntity.getHost(), e.getMessage());
                    throw new CompletionException(e);
                }
            });
            nodeFutures.add(Pair.of(nodeEntity, portCheckFuture));
        }

        boolean checkFailed = false;
        StringBuilder exStrBuilder = new StringBuilder();
        for (Pair<ResourceNodeEntity, CompletableFuture<Boolean>> nodeFuture: nodeFutures) {
            ResourceNodeEntity nodeEntity = nodeFuture.getLeft();
            CompletableFuture<Boolean> future = nodeFuture.getRight();
            try {
                boolean isAvailablePort = future.get();
                if (!isAvailablePort) {
                    checkFailed = true;
                    log.error("node {}:{} port already in use", nodeEntity.getHost(), nodeEntity.getAgentPort());
                    throw new Exception(String.format("node %s:%d port already in use",
                            nodeEntity.getHost(), nodeEntity.getAgentPort()));
                }
            } catch (Exception e) {
                checkFailed = true;
                log.error("node {}:{} check exception {}", nodeEntity.getHost(), nodeEntity.getAgentPort(), e);
                exStrBuilder.append(String.format("%s:%d, %s",
                        nodeEntity.getHost(), nodeEntity.getAgentPort(), e));
                exStrBuilder.append("\n");
            }
        }

        if (checkFailed) {
            log.error("check node exception list: {}\n", exStrBuilder);
            throw new Exception(exStrBuilder.toString());
        }

        log.debug("install agent for resource cluster {} all nodes", resourceClusterId);
        for (ResourceNodeEntity nodeEntity : nodeEntities) {
            log.info("start to install agent to {} node {}", nodeEntity.getId(), nodeEntity.getHost());
            AgentInstallEventConfigInfo configInfo = new AgentInstallEventConfigInfo();
            configInfo.setSshUser(accessInfo.getSshUser());
            configInfo.setSshPort(accessInfo.getSshPort());
            configInfo.setSshKey(accessInfo.getSshKey());

            nodeAndAgentManager.installAgentOperation(nodeEntity, configInfo, requestId);
        }
    }