in hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsClusterDataStorage.java [149:267]
public void saveData(CollectRep.MetricsData metricsData) {
if (!isServerAvailable()) {
serverAvailable = checkVictoriaMetricsDatasourceAvailable();
}
if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) {
return;
}
if (metricsData.getValues().isEmpty()) {
log.info("[warehouse victoria-metrics] flush metrics data {} {} {} is null, ignore.",
metricsData.getId(), metricsData.getApp(), metricsData.getMetrics());
return;
}
Map<String, String> defaultLabels = Maps.newHashMapWithExpectedSize(8);
defaultLabels.put(MONITOR_METRICS_KEY, metricsData.getMetrics());
boolean isPrometheusAuto;
if (metricsData.getApp().startsWith(CommonConstants.PROMETHEUS_APP_PREFIX)) {
isPrometheusAuto = true;
defaultLabels.remove(MONITOR_METRICS_KEY);
defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp()
.substring(CommonConstants.PROMETHEUS_APP_PREFIX.length()));
} else {
isPrometheusAuto = false;
defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp());
}
defaultLabels.put(LABEL_KEY_INSTANCE, String.valueOf(metricsData.getId()));
try {
List<CollectRep.Field> fieldList = metricsData.getFields();
Long[] timestamp = new Long[]{metricsData.getTime()};
Map<String, Double> fieldsValue = Maps.newHashMapWithExpectedSize(fieldList.size());
Map<String, String> labels = Maps.newHashMapWithExpectedSize(fieldList.size());
List<VictoriaMetricsDataStorage.VictoriaMetricsContent> contentList = new LinkedList<>();
RowWrapper rowWrapper = metricsData.readRow();
while (rowWrapper.hasNextRow()) {
rowWrapper = rowWrapper.nextRow();
fieldsValue.clear();
labels.clear();
rowWrapper.cellStream().forEach(cell -> {
String value = cell.getValue();
Byte type = cell.getMetadataAsByte(MetricDataConstants.TYPE);
Boolean label = cell.getMetadataAsBoolean(MetricDataConstants.LABEL);
if (type == CommonConstants.TYPE_NUMBER && !label) {
// number metrics data
if (!CommonConstants.NULL_VALUE.equals(value)) {
fieldsValue.put(cell.getField().getName(), CommonUtil.parseStrDouble(value));
}
}
// label
if (label && !CommonConstants.NULL_VALUE.equals(value)) {
labels.put(cell.getField().getName(), value);
}
for (Map.Entry<String, Double> entry : fieldsValue.entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
try {
labels.putAll(defaultLabels);
String labelName = isPrometheusAuto ? metricsData.getMetrics()
: metricsData.getMetrics() + SPILT + entry.getKey();
labels.put(LABEL_KEY_NAME, labelName);
if (!isPrometheusAuto) {
labels.put(MONITOR_METRIC_KEY, entry.getKey());
}
VictoriaMetricsDataStorage.VictoriaMetricsContent content = VictoriaMetricsDataStorage.VictoriaMetricsContent.builder()
.metric(new HashMap<>(labels))
.values(new Double[]{entry.getValue()})
.timestamps(timestamp)
.build();
contentList.add(content);
} catch (Exception e) {
log.error("combine metrics data error: {}.", e.getMessage(), e);
}
}
}
});
}
if (contentList.isEmpty()) {
log.info("[warehouse victoria-metrics] flush metrics data {} is empty, ignore.", metricsData.getId());
return;
}
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
if (StringUtils.hasText(vmInsertProps.username())
&& StringUtils.hasText(vmInsertProps.password())) {
String authStr = vmInsertProps.username() + ":" + vmInsertProps.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC + SignConstants.BLANK + encodedAuth);
}
StringBuilder stringBuilder = new StringBuilder();
for (VictoriaMetricsDataStorage.VictoriaMetricsContent content : contentList) {
stringBuilder.append(JsonUtil.toJson(content)).append("\n");
}
HttpEntity<String> httpEntity = new HttpEntity<>(stringBuilder.toString(), headers);
String importUrl = vmClusterProps.insert().url() + VM_INSERT_BASE_PATH.formatted(vmClusterProps.accountID(), IMPORT_PATH);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(importUrl,
httpEntity, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("insert metrics data to victoria-metrics success.");
} else {
log.error("insert metrics data to victoria-metrics failed. {}", responseEntity.getBody());
}
} catch (Exception e){
log.error("flush metrics data to victoria-metrics error: {}.", e.getMessage(), e);
}
} catch (Exception e) {
log.error("flush metrics data to victoria-metrics error: {}.", e.getMessage(), e);
}
}