in hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/iotdb/IotDbDataStorage.java [177:259]
public void saveData(CollectRep.MetricsData metricsData) {
if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) {
return;
}
if (metricsData.getValues().isEmpty()) {
log.info("[warehouse iotdb] flush metrics data {} is null, ignore.", metricsData.getId());
return;
}
List<MeasurementSchema> schemaList = new ArrayList<>();
Map<String, Tablet> tabletMap = Maps.newHashMapWithExpectedSize(8);
// todo Measurement schema is a data structure that is generated on the client side, and encoding and compression have no effect
try {
metricsData.getFields().forEach(field -> {
MeasurementSchema schema = new MeasurementSchema();
schema.setMeasurementId(field.getName());
byte type = (byte) field.getType();
// handle field type
if (type == CommonConstants.TYPE_NUMBER) {
schema.setType(TSDataType.DOUBLE);
} else if (type == CommonConstants.TYPE_STRING) {
schema.setType(TSDataType.TEXT);
}
schemaList.add(schema);
});
long now = System.currentTimeMillis();
RowWrapper rowWrapper = metricsData.readRow();
while (rowWrapper.hasNextRow()) {
rowWrapper = rowWrapper.nextRow();
Map<String, String> labels = Maps.newHashMapWithExpectedSize(8);
rowWrapper.cellStream().forEach(cell -> {
if (cell.getMetadataAsBoolean(MetricDataConstants.LABEL) && !CommonConstants.NULL_VALUE.equals(cell.getValue())) {
labels.put(cell.getField().getName(), cell.getValue());
}
});
String label = JsonUtil.toJson(labels);
String deviceId = getDeviceId(metricsData.getApp(), metricsData.getMetrics(), metricsData.getId(), label, false);
if (tabletMap.containsKey(label)) {
// Avoid Time repeats
now++;
} else {
tabletMap.put(label, new Tablet(deviceId, schemaList));
}
Tablet tablet = tabletMap.get(label);
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, now);
rowWrapper.cellStream().forEach(cell -> {
if (CommonConstants.NULL_VALUE.equals(cell.getValue())) {
tablet.addValue(cell.getField().getName(), rowIndex, null);
return;
}
Byte type = cell.getMetadataAsByte(MetricDataConstants.TYPE);
if (type == CommonConstants.TYPE_NUMBER) {
tablet.addValue(cell.getField().getName(), rowIndex, Double.parseDouble(cell.getValue()));
} else if (type == CommonConstants.TYPE_STRING) {
tablet.addValue(cell.getField().getName(), rowIndex, cell.getValue());
}
});
}
for (Tablet tablet : tabletMap.values()) {
this.sessionPool.insertTablet(tablet, true);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
for (Tablet tablet : tabletMap.values()) {
tablet.reset();
}
tabletMap.clear();
}
}