public void saveData()

in hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tdengine/TdEngineDataStorage.java [171:315]


    public void saveData(CollectRep.MetricsData metricsData) {
        if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) {
            return;
        }
        if (metricsData.getValues().isEmpty()) {

            if (log.isInfoEnabled()) {
                log.info("[warehouse tdengine] flush metrics data {} is null, ignore.", metricsData.getId());
            }

            return;
        }

        String monitorId = String.valueOf(metricsData.getId());
        String superTable = metricsData.getApp() + "_" + metricsData.getMetrics() + "_super";
        String table = metricsData.getApp() + "_" + metricsData.getMetrics() + "_" + monitorId;
        StringBuilder sqlBuffer = new StringBuilder();
        int i = 0;

        try {
            RowWrapper rowWrapper = metricsData.readRow();

            while (rowWrapper.hasNextRow()) {
                rowWrapper = rowWrapper.nextRow();
                StringBuilder sqlRowBuffer = new StringBuilder("(");
                sqlRowBuffer.append(metricsData.getTime() + i++).append(", ");
                Map<String, String> labels = Maps.newHashMapWithExpectedSize(8);
                sqlRowBuffer.append("'").append("%s").append("', ");


                AtomicInteger index = new AtomicInteger(-1);
                int fieldMaxSize = rowWrapper.getFieldList().size();
                rowWrapper.cellStream().forEach(cell -> {
                    index.getAndIncrement();
                    String value = cell.getValue();
                    final int fieldType;

                    if ((fieldType = cell.getMetadataAsByte(MetricDataConstants.TYPE)) == CommonConstants.TYPE_NUMBER || fieldType == CommonConstants.TYPE_TIME) {
                        // number data
                        if (CommonConstants.NULL_VALUE.equals(value)) {
                            sqlRowBuffer.append("NULL");
                        } else {
                            try {
                                double number = Double.parseDouble(value);
                                sqlRowBuffer.append(number);
                            } catch (Exception e) {

                                if (log.isWarnEnabled()) {
                                    log.warn(e.getMessage());
                                }

                                sqlRowBuffer.append("NULL");
                            }
                        }
                    } else {
                        // string
                        if (CommonConstants.NULL_VALUE.equals(value)) {
                            sqlRowBuffer.append("NULL");
                        } else {
                            sqlRowBuffer.append("'").append(formatStringValue(value)).append("'");
                        }
                    }

                    if (cell.getMetadataAsBoolean(MetricDataConstants.LABEL) && !CommonConstants.NULL_VALUE.equals(value)) {
                        labels.put(cell.getField().getName(), formatStringValue(value));
                    }
                    if (index.get() != fieldMaxSize - 1) {
                        sqlRowBuffer.append(", ");
                    }
                });

                sqlRowBuffer.append(")");
                sqlBuffer.append(" ").append(String.format(sqlRowBuffer.toString(), formatStringValue(JsonUtil.toJson(labels))));
            }

            String insertDataSql = String.format(INSERT_TABLE_DATA_SQL, table, superTable, monitorId, sqlBuffer);

            if (log.isDebugEnabled()) {
                log.debug(insertDataSql);
            }

            Connection connection = null;
            Statement statement = null;
            try {
                connection = hikariDataSource.getConnection();
                statement = connection.createStatement();
                statement.execute(insertDataSql);
                connection.close();
            } catch (Exception e) {
                if (e.getMessage().contains(NO_SUPER_TABLE_ERROR)) {
                    // stable not exists, create it
                    StringBuilder fieldSqlBuilder = new StringBuilder("(");
                    fieldSqlBuilder.append("ts TIMESTAMP, ");
                    fieldSqlBuilder.append("instance NCHAR(").append(tableStrColumnDefineMaxLength).append("), ");
                    for (int index = 0; index < metricsData.getFields().size(); index++) {
                        CollectRep.Field field = metricsData.getFields().get(index);
                        String fieldName = field.getName();
                        final int fieldType = field.getType();

                        if (fieldType == CommonConstants.TYPE_NUMBER || fieldType == CommonConstants.TYPE_TIME) {
                            fieldSqlBuilder.append("`").append(fieldName).append("` ").append("DOUBLE");
                        } else {
                            fieldSqlBuilder.append("`").append(fieldName).append("` ").append("NCHAR(")
                                    .append(tableStrColumnDefineMaxLength).append(")");
                        }
                        if (index != metricsData.getFields().size() - 1) {
                            fieldSqlBuilder.append(", ");
                        }
                    }
                    fieldSqlBuilder.append(")");
                    String createTableSql = String.format(CREATE_SUPER_TABLE_SQL, superTable, fieldSqlBuilder);
                    try {
                        assert statement != null;

                        if (log.isInfoEnabled()) {
                            log.info("[tdengine-data]: create {} use sql: {}.", superTable, createTableSql);
                        }

                        statement.execute(createTableSql);
                        statement.execute(insertDataSql);
                    } catch (Exception createTableException) {
                        if (log.isErrorEnabled()) {
                            log.error(e.getMessage(), createTableException);
                        }
                    }
                } else {
                    if (log.isErrorEnabled()) {
                        log.error(e.getMessage());
                    }
                }
            } finally {
                try {
                    assert connection != null;
                    connection.close();
                } catch (Exception e) {
                    if (log.isErrorEnabled()) {
                        log.error(e.getMessage());
                    }
                }
            }
        } catch (Exception e) {
            log.error("[warehouse tdEngine]--Error: {}", e.getMessage(), e);
        }

    }