in hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tdengine/TdEngineDataStorage.java [171:315]
public void saveData(CollectRep.MetricsData metricsData) {
if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) {
return;
}
if (metricsData.getValues().isEmpty()) {
if (log.isInfoEnabled()) {
log.info("[warehouse tdengine] flush metrics data {} is null, ignore.", metricsData.getId());
}
return;
}
String monitorId = String.valueOf(metricsData.getId());
String superTable = metricsData.getApp() + "_" + metricsData.getMetrics() + "_super";
String table = metricsData.getApp() + "_" + metricsData.getMetrics() + "_" + monitorId;
StringBuilder sqlBuffer = new StringBuilder();
int i = 0;
try {
RowWrapper rowWrapper = metricsData.readRow();
while (rowWrapper.hasNextRow()) {
rowWrapper = rowWrapper.nextRow();
StringBuilder sqlRowBuffer = new StringBuilder("(");
sqlRowBuffer.append(metricsData.getTime() + i++).append(", ");
Map<String, String> labels = Maps.newHashMapWithExpectedSize(8);
sqlRowBuffer.append("'").append("%s").append("', ");
AtomicInteger index = new AtomicInteger(-1);
int fieldMaxSize = rowWrapper.getFieldList().size();
rowWrapper.cellStream().forEach(cell -> {
index.getAndIncrement();
String value = cell.getValue();
final int fieldType;
if ((fieldType = cell.getMetadataAsByte(MetricDataConstants.TYPE)) == CommonConstants.TYPE_NUMBER || fieldType == CommonConstants.TYPE_TIME) {
// number data
if (CommonConstants.NULL_VALUE.equals(value)) {
sqlRowBuffer.append("NULL");
} else {
try {
double number = Double.parseDouble(value);
sqlRowBuffer.append(number);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn(e.getMessage());
}
sqlRowBuffer.append("NULL");
}
}
} else {
// string
if (CommonConstants.NULL_VALUE.equals(value)) {
sqlRowBuffer.append("NULL");
} else {
sqlRowBuffer.append("'").append(formatStringValue(value)).append("'");
}
}
if (cell.getMetadataAsBoolean(MetricDataConstants.LABEL) && !CommonConstants.NULL_VALUE.equals(value)) {
labels.put(cell.getField().getName(), formatStringValue(value));
}
if (index.get() != fieldMaxSize - 1) {
sqlRowBuffer.append(", ");
}
});
sqlRowBuffer.append(")");
sqlBuffer.append(" ").append(String.format(sqlRowBuffer.toString(), formatStringValue(JsonUtil.toJson(labels))));
}
String insertDataSql = String.format(INSERT_TABLE_DATA_SQL, table, superTable, monitorId, sqlBuffer);
if (log.isDebugEnabled()) {
log.debug(insertDataSql);
}
Connection connection = null;
Statement statement = null;
try {
connection = hikariDataSource.getConnection();
statement = connection.createStatement();
statement.execute(insertDataSql);
connection.close();
} catch (Exception e) {
if (e.getMessage().contains(NO_SUPER_TABLE_ERROR)) {
// stable not exists, create it
StringBuilder fieldSqlBuilder = new StringBuilder("(");
fieldSqlBuilder.append("ts TIMESTAMP, ");
fieldSqlBuilder.append("instance NCHAR(").append(tableStrColumnDefineMaxLength).append("), ");
for (int index = 0; index < metricsData.getFields().size(); index++) {
CollectRep.Field field = metricsData.getFields().get(index);
String fieldName = field.getName();
final int fieldType = field.getType();
if (fieldType == CommonConstants.TYPE_NUMBER || fieldType == CommonConstants.TYPE_TIME) {
fieldSqlBuilder.append("`").append(fieldName).append("` ").append("DOUBLE");
} else {
fieldSqlBuilder.append("`").append(fieldName).append("` ").append("NCHAR(")
.append(tableStrColumnDefineMaxLength).append(")");
}
if (index != metricsData.getFields().size() - 1) {
fieldSqlBuilder.append(", ");
}
}
fieldSqlBuilder.append(")");
String createTableSql = String.format(CREATE_SUPER_TABLE_SQL, superTable, fieldSqlBuilder);
try {
assert statement != null;
if (log.isInfoEnabled()) {
log.info("[tdengine-data]: create {} use sql: {}.", superTable, createTableSql);
}
statement.execute(createTableSql);
statement.execute(insertDataSql);
} catch (Exception createTableException) {
if (log.isErrorEnabled()) {
log.error(e.getMessage(), createTableException);
}
}
} else {
if (log.isErrorEnabled()) {
log.error(e.getMessage());
}
}
} finally {
try {
assert connection != null;
connection.close();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error(e.getMessage());
}
}
}
} catch (Exception e) {
log.error("[warehouse tdEngine]--Error: {}", e.getMessage(), e);
}
}