in hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeDbDataStorage.java [120:192]
public void saveData(CollectRep.MetricsData metricsData) {
if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) {
return;
}
if (metricsData.getValues().isEmpty()) {
log.info("[warehouse greptime] flush metrics data {} {}is null, ignore.", metricsData.getId(), metricsData.getMetrics());
return;
}
String monitorId = String.valueOf(metricsData.getId());
String tableName = getTableName(metricsData.getApp(), metricsData.getMetrics());
TableSchema.Builder tableSchemaBuilder = TableSchema.newBuilder(tableName);
tableSchemaBuilder.addTag("instance", DataType.String)
.addTimestamp("ts", DataType.TimestampMillisecond);
List<CollectRep.Field> fields = metricsData.getFields();
fields.forEach(field -> {
if (field.getLabel()) {
tableSchemaBuilder.addTag(field.getName(), DataType.String);
} else {
if (field.getType() == CommonConstants.TYPE_NUMBER) {
tableSchemaBuilder.addField(field.getName(), DataType.Float64);
} else if (field.getType() == CommonConstants.TYPE_STRING) {
tableSchemaBuilder.addField(field.getName(), DataType.String);
}
}
});
Table table = Table.from(tableSchemaBuilder.build());
long now = System.currentTimeMillis();
Object[] values = new Object[2 + fields.size()];
values[0] = monitorId;
values[1] = now;
RowWrapper rowWrapper = metricsData.readRow();
while (rowWrapper.hasNextRow()) {
rowWrapper = rowWrapper.nextRow();
AtomicInteger index = new AtomicInteger(-1);
rowWrapper.cellStream().forEach(cell -> {
index.getAndIncrement();
if (CommonConstants.NULL_VALUE.equals(cell.getValue())) {
values[2 + index.get()] = null;
return;
}
Boolean label = cell.getMetadataAsBoolean(MetricDataConstants.LABEL);
Byte type = cell.getMetadataAsByte(MetricDataConstants.TYPE);
if (label) {
values[2 + index.get()] = cell.getValue();
} else {
if (type == CommonConstants.TYPE_NUMBER) {
values[2 + index.get()] = Double.parseDouble(cell.getValue());
} else if (type == CommonConstants.TYPE_STRING) {
values[2 + index.get()] = cell.getValue();
}
}
});
table.addRow(values);
}
CompletableFuture<Result<WriteOk, Err>> writeFuture = greptimeDb.write(table);
try {
Result<WriteOk, Err> result = writeFuture.get(10, TimeUnit.SECONDS);
if (result.isOk()) {
log.debug("[warehouse greptime]-Write successful");
} else {
log.warn("[warehouse greptime]--Write failed: {}", result.getErr());
}
} catch (Throwable throwable) {
log.error("[warehouse greptime]--Error occurred: {}", throwable.getMessage());
}
}