in pkg/sls-plugin.go [471:604]
func (ds *SlsDatasource) getMetricLogs(_ chan Result, query backend.DataQuery, queryInfo *QueryInfo, logSource *LogSource, response backend.DataResponse, frames *data.Frames) error {
var logStore string
if queryInfo.LogStore != "" {
logStore = queryInfo.LogStore
} else {
logStore = logSource.LogStore
}
project := logSource.Project
endpoint := logSource.Endpoint
headers := logSource.Headers
metricStore := logStore
queryType := queryInfo.QueryType
intervalMs := queryInfo.IntervalMs
// 判断如果 IntervalMs <= 0 则设置为 15000
if intervalMs <= 0 {
intervalMs = 15000
}
// 根据 queryType 动态调整 URL 的路径部分
var apiPath string
if queryType == "instant" {
apiPath = "query"
} else {
apiPath = "query_range"
}
baseUrl := fmt.Sprintf("https://%s.%s/prometheus/%s/%s/api/v1/%s", project, endpoint, project, metricStore, apiPath)
from := query.TimeRange.From.Unix()
to := query.TimeRange.To.Unix()
queryStr := queryInfo.Query
log.DefaultLogger.Debug("getMetricLogs-intervalMs", "intervalMs", intervalMs)
step := (time.Duration(intervalMs) * time.Millisecond).String()
if queryInfo.Step != "" {
step = queryInfo.Step
}
urlVal := url.Values{}
var uri string
switch apiPath {
case "query":
urlVal.Add("query", queryStr)
urlVal.Add("time", strconv.FormatInt(to, 10))
uri = fmt.Sprintf("%s?%v", baseUrl, urlVal.Encode())
case "query_range":
urlVal.Add("query", queryStr)
urlVal.Add("start", strconv.FormatInt(from, 10))
urlVal.Add("end", strconv.FormatInt(to, 10))
// 判断 step 是不是空 则使用默认值
urlVal.Add("step", step)
uri = fmt.Sprintf("%s?%v", baseUrl, urlVal.Encode())
}
log.DefaultLogger.Debug("getMetricLogs-URL", "uri", uri)
// 创建 HTTP 请求
req, err := http.NewRequest(http.MethodGet, uri, nil)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(logSource.AccessKeyId, logSource.AccessKeySecret)
// 判断 headers 是否存在,存在则添加
if len(headers) > 0 {
for _, header := range headers {
req.Header.Set(header.Name, header.Value)
}
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.DefaultLogger.Error("getMetricLogs ", "queryInfo: ", queryInfo, "error ", err)
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) // 需要读完body内容。
if err != nil {
log.DefaultLogger.Debug("getMetricLogs-read-body-err", "err", err)
return err
}
// 拿到 body 中的 data
// 解析 JSON 数据到结构体
var metricLogs MetricLogs
if err := json.Unmarshal([]byte(body), &metricLogs); err != nil {
log.DefaultLogger.Debug("parse-body-err", "err", err)
return err
}
for _, metricData := range metricLogs.Data.Result {
var frame *data.Frame
if compatible {
frame = data.NewFrame("response")
} else {
frame = data.NewFrame("")
}
var times []time.Time = make([]time.Time, 0)
var values []float64 = make([]float64, 0)
if apiPath == "query_range" {
// 遍历result.values,将values中的数据添加到frame.Field
for _, value := range metricData.Values {
metricValue, err := strconv.ParseFloat(value[1].(string), 64)
if err != nil {
log.DefaultLogger.Debug("ParseFloat-metricData", "ParseFloat", err, "metricValue", metricValue)
}
times = append(times, time.Unix(int64(value[0].(float64)), 0))
values = append(values, metricValue)
}
} else {
var instantValue []interface{} = metricData.Value
metricValue, err := strconv.ParseFloat(instantValue[1].(string), 64)
if err != nil {
log.DefaultLogger.Debug("ParseFloat-metricData", "ParseFloat", err, "metricValue", metricValue)
}
times = append(times, time.Unix(int64(instantValue[0].(float64)), 0))
values = append(values, metricValue)
}
frame.Fields = append(frame.Fields, data.NewField("time", nil, times))
frame.Fields = append(frame.Fields, data.NewField("value", metricData.Metric, values).SetConfig(&data.FieldConfig{
DisplayNameFromDS: formatDisplayName(queryInfo.LegendFormat, metricData.Metric, queryInfo.Query),
}))
*frames = append(*frames, frame)
}
return nil
}