in ozhera-operator/ozhera-operator-service/src/main/java/org/apache/ozhera/operator/handler/HeraResourceEventHandler.java [93:195]
public void onAdd(HeraBootstrap heraBootstrap) {
try {
Preconditions.checkArgument(null != heraBootstrap);
Preconditions.checkArgument(null != heraBootstrap.getSpec());
Preconditions.checkArgument(null != heraBootstrap.getMetadata());
log.info("### on add hera{}", heraBootstrap);
if (null != heraBootstrap.getStatus() && HeraStatus.STATUS_SUCCESS == heraBootstrap.getStatus().getStatus()) {
log.warn("### HERA platform initialized, no need to init again");
return;
}
HeraSpec heraSpec = heraBootstrap.getSpec();
ObjectMeta objectMeta = heraBootstrap.getMetadata();
List<HeraResource> heraResourceList = heraSpec.getResourceList();
//0. Group by resource order
TreeMap<Integer, List<HeraResource>> groupHrList = heraResourceList.stream()
.collect(Collectors.groupingBy(h ->
ResourceTypeEnum.typeOf(h.getResourceType()).getOrder(), TreeMap::new, Collectors.toList()
)
);
int step = 10;
for (Map.Entry<Integer, List<HeraResource>> entrySet : groupHrList.entrySet()) {
List<HeraResource> hrList = entrySet.getValue();
log.warn("hera operator add, applyResource resourceType:{}, HeraResource size:{}", entrySet.getKey(), hrList.size());
//1. deploy
for (HeraResource heraResource : hrList) {
k8sUtilBean.applyResource(heraResource, objectMeta, "add");
}
//2. Checking deployment status
TimeUnit.SECONDS.sleep(step--);
block2checkStatus(objectMeta);
//3. Resource initialization
for (HeraResource heraResource : hrList) {
// nacos configuration initialization
if (ResourceTypeEnum.Nacos.getTypeName().equals(heraResource.getResourceType())) {
Preconditions.checkArgument(null != heraResource.getConnectionMapList(), "nacos connection kv config can not be null");
Map<String, String> newMap = new HashMap<>();
heraResource.getConnectionMapList().forEach(map -> newMap.put(map.get("key"), map.get("value")));
Preconditions.checkArgument(StringUtils.isNotEmpty(newMap.get(HoConstant.KEY_NACOS_ADDRESS)), String.format("nacos connection config:%s can not be null", HoConstant.KEY_NACOS_ADDRESS));
Preconditions.checkArgument(StringUtils.isNotEmpty(newMap.get(HoConstant.KEY_NACOS_PASSWORD)), String.format("nacos connection config:%s can not be null", HoConstant.KEY_NACOS_PASSWORD));
// nacos initialization
initNacos("add", newMap.get(HoConstant.KEY_NACOS_ADDRESS), newMap.get(HoConstant.KEY_NACOS_PASSWORD), heraResource.getPropList());
}
// es configuration initialization
if (ResourceTypeEnum.ES.getTypeName().equals(heraResource.getResourceType())) {
Preconditions.checkArgument(null != heraResource.getConnectionMapList(), "es connection kv config can not be null");
Map<String, String> newMap = new HashMap<>();
heraResource.getConnectionMapList().forEach(map -> newMap.put(map.get("key"), map.get("value")));
Preconditions.checkArgument(StringUtils.isNotEmpty(newMap.get(HoConstant.KEY_ES_URL)), String.format("es connection config:%s can not be null", HoConstant.KEY_ES_URL));
initES(newMap.get(HoConstant.KEY_ES_URL), newMap.get(HoConstant.KEY_ES_USERNAME), newMap.get(HoConstant.KEY_ES_PASSWORD));
}
// mysql configuration initialization
if (ResourceTypeEnum.MYSQL.getTypeName().equals(heraResource.getResourceType())) {
Preconditions.checkArgument(null != heraResource.getConnectionMapList(), "mysql connection kv config can not be null");
Map<String, String> newMap = new HashMap<>();
heraResource.getConnectionMapList().forEach(map -> newMap.put(map.get("key"), map.get("value")));
Preconditions.checkArgument(StringUtils.isNotEmpty(newMap.get(HoConstant.KEY_DATASOURCE_URL)), String.format("mysql connection config:%s can not be null", HoConstant.KEY_DATASOURCE_URL));
Preconditions.checkArgument(StringUtils.isNotEmpty(newMap.get(HoConstant.KEY_DATASOURCE_USERNAME)), String.format("es connection config:%s can not be null", HoConstant.KEY_DATASOURCE_USERNAME));
Preconditions.checkArgument(StringUtils.isNotEmpty(newMap.get(HoConstant.KEY_DATASOURCE_PASSWORD)), String.format("es connection config:%s can not be null", HoConstant.KEY_DATASOURCE_PASSWORD));
initSql("add", newMap.get(HoConstant.KEY_DATASOURCE_URL), newMap.get(HoConstant.KEY_DATASOURCE_USERNAME), newMap.get(HoConstant.KEY_DATASOURCE_PASSWORD));
}
// rocketmq configuration initialization
if (ResourceTypeEnum.ROCKETMQ.getTypeName().equals(heraResource.getResourceType())) {
Preconditions.checkArgument(null != heraResource.getConnectionMapList(), "rocketmq connection kv config can not be null");
Map<String, String> newMap = new HashMap<>();
heraResource.getConnectionMapList().forEach(map -> newMap.put(map.get("key"), map.get("value")));
Preconditions.checkArgument(StringUtils.isNotEmpty(newMap.get(HoConstant.KEY_ROCKETMQ_NAMESERVER)), String.format("rocketmq nameserver config:%s can not be null", HoConstant.KEY_ROCKETMQ_NAMESERVER));
initRocketMQ(newMap.get(HoConstant.KEY_ROCKETMQ_NAMESERVER));
}
}
}
HeraStatus heraStatus = new HeraStatus();
heraStatus.setStatus(HeraStatus.STATUS_SUCCESS);
heraStatus.setMsg("success");
heraBootstrap.setStatus(heraStatus);
heraClient.patchStatus(heraBootstrap);
log.warn("hera operator onAdd success");
} catch (Throwable e) {
log.error("hera operator onAdd error:", e);
HeraStatus heraStatus = new HeraStatus();
heraStatus.setStatus(HeraStatus.STATUS_FAILED);
heraStatus.setMsg(ExceptionUtils.getStackTrace(e));
heraBootstrap.setStatus(heraStatus);
heraClient.patchStatus(heraBootstrap);
}
}