in manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java [73:160]
public void configOperation(long clusterId, DorisClusterModuleDeployConfig deployConfig) {
String moduleName = deployConfig.getModuleName();
log.info("config module name {} for cluster {}", moduleName, clusterId);
List<ClusterModuleEntity> moduleEntities = clusterModuleRepository.getByClusterIdAndModuleName(clusterId, moduleName);
// Step fallback operation
// If it has been configured before, you need to delete the service information
for (ClusterModuleEntity moduleEntity : moduleEntities) {
serviceRepository.deleteByModuleId(moduleEntity.getId());
}
ClusterModuleEntity moduleEntity = moduleEntities.get(0);
moduleEntity.setConfig(JSON.toJSONString(deployConfig));
// add service for module
List<ClusterInstanceEntity> instanceEntities = instanceRepository.getByModuleId(moduleEntity.getId());
List<String> accessInfo = new ArrayList<>();
for (ClusterInstanceEntity instanceEntity : instanceEntities) {
accessInfo.add(instanceEntity.getAddress());
}
Map<String, Integer> serviceNamePorts = new HashMap<>();
int editLogPort = 0;
List<String> followerIp = new ArrayList<>();
List<String> observerIps = new ArrayList<>();
if (moduleName.equals(ServerAndAgentConstant.FE_NAME)) {
// for fe service,jdbc and http
Map<String, Integer> editServiceNamePort = new HashMap<>();
for (DeployConfigItem configItem : deployConfig.getConfigs()) {
if (configItem.getKey().equals(ConfigDefault.FE_HTTP_PORT_CONFIG_NAME)) {
serviceNamePorts.put(ServerAndAgentConstant.FE_HTTP_SERVICE, Integer.valueOf(configItem.getValue()));
}
if (configItem.getKey().equals(ConfigDefault.FE_QUERY_PORT_CONFIG_NAME)) {
serviceNamePorts.put(ServerAndAgentConstant.FE_JDBC_SERVICE, Integer.valueOf(configItem.getValue()));
}
if (configItem.getKey().equals(ConfigDefault.FE_EDIT_LOG_PORT)) {
editLogPort = Integer.valueOf(configItem.getValue());
editServiceNamePort.put(ServerAndAgentConstant.FE_EDIT_SERVICE, editLogPort);
}
}
// Set follower or observer
// Followers are stored in the extra information of instance
int index = 0;
String followerEndpoint = "";
for (ClusterInstanceEntity instanceEntity : instanceEntities) {
if (index < 1) {
followerEndpoint = instanceEntity.getAddress() + ":" + editLogPort;
followerIp.add(instanceEntity.getAddress());
} else {
instanceEntity.setExtraInfo(followerEndpoint);
instanceRepository.save(instanceEntity);
observerIps.add(instanceEntity.getAddress());
}
index++;
}
serviceCreateOperation(moduleEntity, serviceNamePorts, followerIp);
// TODO:Modify it when the Fe capacity is expanded
serviceCreateOperation(moduleEntity, editServiceNamePort, observerIps);
} else if (moduleName.equals(ServerAndAgentConstant.BE_NAME)) {
// for be service, heartbeat
for (DeployConfigItem configItem : deployConfig.getConfigs()) {
if (configItem.getKey().equals(ConfigDefault.BE_HEARTBEAT_PORT_CONFIG_NAME)) {
serviceNamePorts.put(ServerAndAgentConstant.BE_HEARTBEAT_SERVICE,
Integer.valueOf(configItem.getValue()));
}
if (configItem.getKey().equals(ConfigDefault.BE_WEBSERVER_PORT_NAME)) {
serviceNamePorts.put(ServerAndAgentConstant.BE_HTTP_SERVICE,
Integer.valueOf(configItem.getValue()));
}
}
serviceCreateOperation(moduleEntity, serviceNamePorts, accessInfo);
} else {
// for broker service, rpc
for (DeployConfigItem configItem : deployConfig.getConfigs()) {
if (configItem.getKey().equals(ConfigDefault.BROKER_PORT_CONFIG_NAME)) {
serviceNamePorts.put(ServerAndAgentConstant.BROKER_PRC_SERVICE, Integer.valueOf(configItem.getValue()));
}
}
serviceCreateOperation(moduleEntity, serviceNamePorts, accessInfo);
}
clusterModuleRepository.save(moduleEntity);
}