in hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsDataStorage.java [130:243]
public void saveData(CollectRep.MetricsData metricsData) {
if (!isServerAvailable()) {
serverAvailable = checkVictoriaMetricsDatasourceAvailable();
}
if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) {
return;
}
if (metricsData.getValues().isEmpty()) {
log.info("[warehouse victoria-metrics] flush metrics data {} {} {} is null, ignore.",
metricsData.getId(), metricsData.getApp(), metricsData.getMetrics());
return;
}
Map<String, String> defaultLabels = Maps.newHashMapWithExpectedSize(8);
defaultLabels.put(MONITOR_METRICS_KEY, metricsData.getMetrics());
boolean isPrometheusAuto = false;
if (metricsData.getApp().startsWith(CommonConstants.PROMETHEUS_APP_PREFIX)) {
isPrometheusAuto = true;
defaultLabels.remove(MONITOR_METRICS_KEY);
defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp()
.substring(CommonConstants.PROMETHEUS_APP_PREFIX.length()));
} else {
defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp());
}
defaultLabels.put(LABEL_KEY_INSTANCE, String.valueOf(metricsData.getId()));
List<VictoriaMetricsContent> contentList = new LinkedList<>();
try {
final int fieldSize = metricsData.getFields().size();
Long[] timestamp = new Long[]{metricsData.getTime()};
Map<String, Double> fieldsValue = Maps.newHashMapWithExpectedSize(fieldSize);
Map<String, String> labels = Maps.newHashMapWithExpectedSize(fieldSize);
RowWrapper rowWrapper = metricsData.readRow();
while (rowWrapper.hasNextRow()) {
rowWrapper = rowWrapper.nextRow();
fieldsValue.clear();
labels.clear();
rowWrapper.cellStream().forEach(cell -> {
String value = cell.getValue();
boolean isLabel = cell.getMetadataAsBoolean(MetricDataConstants.LABEL);
byte type = cell.getMetadataAsByte(MetricDataConstants.TYPE);
if (type == CommonConstants.TYPE_NUMBER && !isLabel) {
// number metrics data
if (!CommonConstants.NULL_VALUE.equals(value)) {
fieldsValue.put(cell.getField().getName(), CommonUtil.parseStrDouble(value));
}
}
// label
if (isLabel && !CommonConstants.NULL_VALUE.equals(value)) {
labels.put(cell.getField().getName(), value);
}
});
for (Map.Entry<String, Double> entry : fieldsValue.entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
try {
labels.putAll(defaultLabels);
String labelName = isPrometheusAuto ? metricsData.getMetrics()
: metricsData.getMetrics() + SPILT + entry.getKey();
labels.put(LABEL_KEY_NAME, labelName);
if (!isPrometheusAuto) {
labels.put(MONITOR_METRIC_KEY, entry.getKey());
}
VictoriaMetricsContent content = VictoriaMetricsContent.builder()
.metric(new HashMap<>(labels))
.values(new Double[]{entry.getValue()})
.timestamps(timestamp)
.build();
contentList.add(content);
} catch (Exception e) {
log.error("combine metrics data error: {}.", e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
log.error("save metrics data to victoria error: {}.", e.getMessage(), e);
}
if (contentList.isEmpty()) {
log.info("[warehouse victoria-metrics] flush metrics data {} is empty, ignore.", metricsData.getId());
return;
}
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
if (StringUtils.hasText(victoriaMetricsProp.username())
&& StringUtils.hasText(victoriaMetricsProp.password())) {
String authStr = victoriaMetricsProp.username() + ":" + victoriaMetricsProp.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC + SignConstants.BLANK + encodedAuth);
}
StringBuilder stringBuilder = new StringBuilder();
for (VictoriaMetricsContent content : contentList) {
stringBuilder.append(JsonUtil.toJson(content)).append("\n");
}
HttpEntity<String> httpEntity = new HttpEntity<>(stringBuilder.toString(), headers);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(victoriaMetricsProp.url() + IMPORT_PATH,
httpEntity, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("insert metrics data to victoria-metrics success.");
} else {
log.error("insert metrics data to victoria-metrics failed. {}", responseEntity.getBody());
}
} catch (Exception e){
log.error("flush metrics data to victoria-metrics error: {}.", e.getMessage(), e);
}
}