in ozhera-monitor/ozhera-monitor-service/src/main/java/org/apache/ozhera/monitor/service/prometheus/PrometheusService.java [1194:1277]
private Result loadTypeOracle(String mode) {
//负载动态扩缩绒模式
log.info("PrometheusService.loadTypeOracle mode : {}", mode);
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 5, TimeUnit.MINUTES, new LinkedBlockingQueue(100),
(Runnable r) -> new Thread(r, "compute-execute-thread-v2"), new ThreadPoolExecutor.CallerRunsPolicy());
AppCapacityAutoAdjust autoQuery = new AppCapacityAutoAdjust();
autoQuery.setStatus(0); //0表示开启状态
autoQuery.setAutoCapacity(1); //开启自动扩容的
try {
List<AppCapacityAutoAdjust> result = appCapacityAutoAdjustDao.query(autoQuery, null, null);
//获取container字段,并且开始遍历查询prometheus
result.stream().forEach(res -> {
String container = res.getContainer();
int pipelineId = res.getPipelineId();
String query = "sum(container_spec_cpu_quota{system=\"mione\",container=\"" + container + "\",image != \"\"}) by (container)" +
" / sum(container_spec_cpu_period{system=\"mione\",container=\"" + container + "\",image != \"\"}) by (container)";
executor.execute(() -> {
Map<String, Object> map = new HashMap<>();
map.put(P_QUERY, query); //指标参数
map.put(P_TIME, System.currentTimeMillis() / 1000L);
String data = restTemplateService.getHttpM(completeQueryUrl(prometheusUrl, URI_QUERY_MOMENT), map);
// System.out.println(data);
MetricResponseVector metricResult = new Gson().fromJson(data, MetricResponseVector.class);
if (metricResult == null || !"success".equals(metricResult.getStatus())) {
return;
}
List<MetricDataSetVector> metricSet = metricResult.getData().getResult();
//metricSet只有一条元素,因为container只有一个
int cpuCoreSize = Integer.parseInt(metricSet.get(0).getValue().get(1));
String loadQuery = "";
switch (mode) {
case "normal":
loadQuery = "avg(container_cpu_load_average_10s{system=\"mione\",image != \"\",container=\"" + container + "\"}) by (container,namespace) /1000";
break;
case "predict":
loadQuery = "";
break;
default:
loadQuery = "";
}
map.put(P_QUERY, loadQuery); //指标参数
String loadData = restTemplateService.getHttpM(completeQueryUrl(prometheusUrl, URI_QUERY_MOMENT), map);
MetricResponseVector loadMetricResult = new Gson().fromJson(loadData, MetricResponseVector.class);
if (loadMetricResult == null || !"success".equals(loadMetricResult.getStatus())) {
return;
}
List<MetricDataSetVector> loadMetricSet = loadMetricResult.getData().getResult();
Double podRealLoad = Double.parseDouble(loadMetricSet.get(0).getValue().get(1));
//和负载阈值比较
if (cpuCoreSize * LOAD_THRESHOLD_ORACLE < podRealLoad) {
//eg .10cpu * 0.7 = 7 < 10 则要扩容
log.info("container : {} ,pipeline: {},需要扩容了 podRealLoad : {} ", container, pipelineId, podRealLoad);
//查询实例数量
String instanceQuery = "count(container_cpu_load_average_10s{system=\"mione\",image !=\"\",container=\"" + container + "\"}) by (pod) ";
map.put(P_QUERY, instanceQuery);
String instanceData = restTemplateService.getHttpM(completeQueryUrl(prometheusUrl, URI_QUERY_MOMENT), map);
MetricResponseVector instanceMetricResult = new Gson().fromJson(instanceData, MetricResponseVector.class);
if (instanceMetricResult == null || !"success".equals(instanceMetricResult.getStatus())) {
return;
}
List<MetricDataSetVector> instanceMetricSet = instanceMetricResult.getData().getResult();
int instanceNum = Integer.parseInt(instanceMetricSet.get(0).getValue().get(1));
MoneSpec moneSpec = new MoneSpec();
moneSpec.init();
moneSpec.setEnvID(pipelineId);
moneSpec.setNamespace(loadMetricSet.get(0).getMetric().getNamespace());
moneSpec.setContainer(loadMetricSet.get(0).getMetric().getContainer());
moneSpec.setReplicas(instanceNum);
moneSpec.setSetReplicas(CountExpectedInstance(instanceNum, "normal",res.getMaxInstance()));
//发送消息
capacityAdjustMessageService.product(moneSpec);
// capacityService.capacityAdjustWithRecord(moneSpec);
// AppCapacityAutoAdjust byId = appCapacityAutoAdjustDao.getById(res.getId());
// log.info("container : {} ,pipeline: {} 扩容mq为: {},queue size : {},AppCapacityAutoAdjust:{}",container, pipelineId, moneSpec.toString(),capacityAdjustMessageService.queueSize(),byId.toString());
} else {
log.info("container : {} ,pipeline: {},不需要扩容了 podRealLoad : {} ", container, pipelineId, podRealLoad);
}
});
});
} catch (Exception e) {
log.error("loadTypeOracle error : {}", e.toString());
}
return Result.success(0);
}