pkg/sls-plugin.go (986 lines of code) (raw):
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// Make sure SampleDatasource implements required interfaces. This is important to do
// since otherwise we will only get a not implemented error response from plugin in
// runtime. In this example datasource instance implements backend.QueryDataHandler,
// backend.CheckHealthHandler, backend.StreamHandler interfaces. Plugin should not
// implement all these interfaces - only those which are required for a particular task.
// For example if plugin does not need streaming functionality then you are free to remove
// methods that implement backend.StreamHandler. Implementing instancemgmt.InstanceDisposer
// is useful to clean up resources used by previous datasource instance when a new datasource
// instance created upon datasource settings changed.
var (
_ backend.QueryDataHandler = (*SlsDatasource)(nil)
_ backend.CheckHealthHandler = (*SlsDatasource)(nil)
_ backend.StreamHandler = (*SlsDatasource)(nil)
_ instancemgmt.InstanceDisposer = (*SlsDatasource)(nil)
)
// NewSampleDatasource creates a new datasource instance.
// func NewSLSDatasource(_ backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
func NewSLSDatasource() datasource.ServeOpts {
ds := &SlsDatasource{}
return datasource.ServeOpts{
CheckHealthHandler: ds,
QueryDataHandler: ds,
CallResourceHandler: newResourceHandler(ds),
}
}
// SampleDatasource is an example datasource which can respond to data queries, reports
// its health and has streaming skills.
type SlsDatasource struct {
backend.DataSourceInstanceSettings
}
// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
// created. As soon as datasource settings change detected by SDK old datasource instance will
// be disposed and a new one will be created using NewSampleDatasource factory function.
func (ds *SlsDatasource) Dispose() {
// Clean up datasource instance resources.
}
// QueryData handles multiple queries and returns multiple responses.
// req contains the queries []DataQuery (where each query contains RefID as a unique identifier).
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
// contains Frames ([]*Frame).
func (ds *SlsDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (res *backend.QueryDataResponse, err error) {
//log.DefaultLogger.Info("QueryData called", "request", req)
config, err := LoadSettings(req.PluginContext)
if err != nil {
return
}
provider := sls.NewStaticCredentialsProvider(config.AccessKeyId, config.AccessKeySecret, "")
client := sls.CreateNormalInterfaceV2(config.Endpoint, provider)
client.SetUserAgent("grafana-go")
if config.Region != "" {
client.SetAuthVersion(sls.AuthV4)
client.SetRegion(config.Region)
}
// create response struct
response := backend.NewQueryDataResponse()
queries := req.Queries
ch := make(chan Result, len(queries))
defer func() {
close(ch)
client = nil
config = nil
if r := recover(); r != nil {
switch r.(type) {
case string:
err = errors.New(r.(string))
case error:
err = r.(error)
}
log.DefaultLogger.Error("QueryData recover", "error", err)
}
}()
log.DefaultLogger.Debug("len(queries)", "len", len(queries))
wg := sync.WaitGroup{}
for _, query := range queries {
wg.Add(1)
log.DefaultLogger.Debug("range_queries", "RefID", query.RefID,
"JSON", query.JSON, "QueryType", query.QueryType)
go ds.QueryLogs(ch, query, client, config)
}
go func(chan Result) {
for res := range ch {
response.Responses[res.refId] = res.dataResponse
wg.Done()
}
}(ch)
wg.Wait()
return response, nil
}
// CheckHealth handles health checks sent from Grafana to the plugin.
// The main use case for these health checks is the test button on the
// datasource configuration page which allows users to verify that
// a datasource is working as expected.
func (ds *SlsDatasource) CheckHealth(_ context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
log.DefaultLogger.Debug("CheckHealth called", "request", req)
config, err := LoadSettings(req.PluginContext)
if err != nil {
return nil, err
}
provider := sls.NewStaticCredentialsProvider(config.AccessKeyId, config.AccessKeySecret, "")
client := sls.CreateNormalInterfaceV2(config.Endpoint, provider)
client.SetUserAgent("grafana-go")
if config.Region != "" {
client.SetAuthVersion(sls.AuthV4)
client.SetRegion(config.Region)
}
var status = backend.HealthStatusOk
var message = "Data source is working"
log.DefaultLogger.Debug("CheckHealth", "project", config.Project, "client", client)
// 拿当前 Project 的信息
_, err = client.GetProject(config.Project)
if err != nil {
status = backend.HealthStatusError
message = err.Error()
}
return &backend.CheckHealthResult{
Status: status,
Message: message,
}, nil
}
// SubscribeStream is called when a client wants to connect to a stream. This callback
// allows sending the first message.
func (ds *SlsDatasource) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
log.DefaultLogger.Debug("SubscribeStream called", "request", req)
status := backend.SubscribeStreamStatusPermissionDenied
if req.Path == "stream" {
// Allow subscribing only on expected path.
status = backend.SubscribeStreamStatusOK
}
return &backend.SubscribeStreamResponse{
Status: status,
}, nil
}
// RunStream is called once for any open channel. Results are shared with everyone
// subscribed to the same channel.
func (ds *SlsDatasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
log.DefaultLogger.Debug("RunStream called", "request", req)
// Create the same data frame as for query data.
frame := data.NewFrame("response")
// Add fields (matching the same schema used in QueryData).
frame.Fields = append(frame.Fields,
data.NewField("time", nil, make([]time.Time, 1)),
data.NewField("values", nil, make([]int64, 1)),
)
counter := 0
// Stream data frames periodically till stream closed by Grafana.
for {
select {
case <-ctx.Done():
log.DefaultLogger.Debug("Context done, finish streaming", "path", req.Path)
return nil
case <-time.After(time.Second):
// Send new data periodically.
frame.Fields[0].Set(0, time.Now())
frame.Fields[1].Set(0, int64(10*(counter%2+1)))
counter++
err := sender.SendFrame(frame, data.IncludeAll)
if err != nil {
log.DefaultLogger.Error("Error sending frame", "error", err)
continue
}
}
}
}
// PublishStream is called when a client sends a message to the stream.
func (ds *SlsDatasource) PublishStream(_ context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
log.DefaultLogger.Debug("PublishStream called", "request", req)
// Do not allow publishing at all.
return &backend.PublishStreamResponse{
Status: backend.PublishStreamStatusPermissionDenied,
}, nil
}
func (ds *SlsDatasource) SortLogs(logs []map[string]string, col string) {
sort.Slice(logs, func(i, j int) bool {
iValue := toTime(logs[i][col])
jValue := toTime(logs[j][col])
if iValue.Unix() < jValue.Unix() {
return true
}
return false
})
}
func (ds *SlsDatasource) QueryLogs(ch chan Result, query backend.DataQuery, client sls.ClientInterface, logSource *LogSource) {
response := backend.DataResponse{}
refId := query.RefID
queryInfo := &QueryInfo{}
defer func() {
queryInfo = nil
if r := recover(); r != nil {
switch r.(type) {
case string:
response.Error = errors.New(r.(string))
case error:
response.Error = r.(error)
}
log.DefaultLogger.Error("QueryLogs recover", "refId", refId, "error", response.Error)
ch <- Result{
refId: refId,
dataResponse: response,
}
}
}()
err := json.Unmarshal(query.JSON, &queryInfo)
if err != nil {
log.DefaultLogger.Error("Unmarshal queryInfo", "refId", refId, "error", err)
response.Error = err
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
xcol := queryInfo.Xcol
from := query.TimeRange.From.Unix()
to := query.TimeRange.To.Unix()
var logStore string
if queryInfo.LogStore != "" {
logStore = queryInfo.LogStore
} else {
logStore = logSource.LogStore
}
// 如果 logstore 为空 返回错误
if logStore == "" {
response.Error = errors.New("logStore is empty, please select a logstore")
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
// 如果是 metric 类型 直连 Prometheus 获取数据
if queryInfo.Type == "metricstore" {
var metricFrames data.Frames
err := ds.getMetricLogs(ch, query, queryInfo, logSource, response, &metricFrames)
if err != nil {
response.Error = err
ch <- Result{
refId: refId,
dataResponse: response,
}
} else {
response.Frames = metricFrames
ch <- Result{
refId: refId,
dataResponse: response,
}
}
return
}
var ycols []string
queryCount := ds.GetQueryCount(queryInfo)
getLogsResp := &sls.GetLogsResponse{}
for i := 0; i < int(queryCount); i++ {
currentPage := queryInfo.CurrentPage
if currentPage <= 0 {
currentPage = 1
}
offset := (currentPage - 1) * queryInfo.LogsPerPage
getLogsReq := &sls.GetLogRequest{
From: from,
To: to,
Query: queryInfo.Query,
Lines: queryInfo.LogsPerPage,
Offset: offset,
Reverse: true,
PowerSQL: queryInfo.PowerSql,
}
tem, err := client.GetLogsV2(logSource.Project, logStore, getLogsReq)
if err != nil {
log.DefaultLogger.Error("GetLogs ", "query : ", queryInfo.Query, "error ", err)
response.Error = err
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
if i == 0 {
getLogsResp = tem
} else {
getLogsResp.Logs = append(getLogsResp.Logs, tem.Logs...)
getLogsResp.Count = getLogsResp.Count + tem.Count
}
if tem.Count < queryInfo.LogsPerPage {
break
}
queryInfo.CurrentPage++
}
logs := getLogsResp.Logs
c := &Contents{}
err = json.Unmarshal([]byte(getLogsResp.Contents), &c)
if err != nil {
log.DefaultLogger.Error("GetLogs ", "Contents : ", getLogsResp.Contents, "error ", err)
response.Error = err
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
if getLogsResp.Progress != "Complete" {
log.DefaultLogger.Warn("GetLogs ", "Progress: ", getLogsResp.Progress, "warn: ", "incomplete logs, you can switch to powerSql")
response.Error = errors.New("incomplete logs, you can switch to powerSql")
}
keys := c.Keys
if compatible {
queryInfo.Ycol = strings.Replace(queryInfo.Ycol, " ", "", -1)
}
isFlowGraph := strings.Contains(queryInfo.Ycol, "#:#")
if isFlowGraph {
ycols = strings.Split(queryInfo.Ycol, "#:#")
} else {
ycols = strings.Split(queryInfo.Ycol, ",")
}
if !compatible {
for i := range ycols {
ycols[i] = strings.TrimSpace(ycols[i])
}
}
log.DefaultLogger.Debug("QueryLogs", "getLogsResp", getLogsResp)
getLogsResp = nil
var frames data.Frames
if xcol == "trace" {
log.DefaultLogger.Debug("BuildTrace")
ds.BuildTrace(logs, &frames)
response.Frames = frames
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
if !strings.Contains(queryInfo.Query, "|") {
log.DefaultLogger.Debug("BuildLogs")
ds.BuildLogs(logs, ycols, &frames)
response.Frames = frames
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
if isFlowGraph {
log.DefaultLogger.Debug("flow_graph")
err = ds.BuildFlowGraph(logs, xcol, ycols, &frames)
if err != nil {
response.Error = err
} else {
response.Frames = frames
}
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
if xcol == "bar" {
log.DefaultLogger.Debug("bar")
ds.BuildBarGraph(logs, ycols, &frames)
} else if xcol == "map" {
log.DefaultLogger.Debug("map")
ds.BuildMapGraph(logs, ycols, &frames)
} else if xcol == "pie" {
log.DefaultLogger.Debug("pie")
ds.BuildPieGraph(logs, ycols, &frames)
} else if xcol != "" && xcol != "map" && xcol != "pie" && xcol != "bar" && xcol != "table" {
log.DefaultLogger.Debug("time_graph")
ds.BuildTimingGraph(logs, xcol, ycols, keys, &frames)
} else {
log.DefaultLogger.Debug("table")
ds.BuildTable(logs, xcol, ycols, keys, &frames)
}
response.Frames = frames
ch <- Result{
refId: refId,
dataResponse: response,
}
}
func (ds *SlsDatasource) GetQueryCount(queryInfo *QueryInfo) int64 {
queryInfo.LogsPerPage = 100
if strings.Contains(strings.ToUpper(queryInfo.Query), "SELECT") && strings.Contains(queryInfo.Query, "|") {
return 1
}
if queryInfo.TotalLogs >= 5000 {
queryInfo.TotalLogs = 5000
}
if queryInfo.TotalLogs <= 0 {
queryInfo.TotalLogs = 1
}
if queryInfo.TotalLogs < queryInfo.LogsPerPage {
queryInfo.LogsPerPage = queryInfo.TotalLogs
}
if queryInfo.LogsPerPage > 0 {
return (queryInfo.TotalLogs + queryInfo.LogsPerPage - 1) / queryInfo.LogsPerPage
}
return 1
}
func (ds *SlsDatasource) getMetricLogs(_ chan Result, query backend.DataQuery, queryInfo *QueryInfo, logSource *LogSource, response backend.DataResponse, frames *data.Frames) error {
var logStore string
if queryInfo.LogStore != "" {
logStore = queryInfo.LogStore
} else {
logStore = logSource.LogStore
}
project := logSource.Project
endpoint := logSource.Endpoint
headers := logSource.Headers
metricStore := logStore
queryType := queryInfo.QueryType
intervalMs := queryInfo.IntervalMs
// 判断如果 IntervalMs <= 0 则设置为 15000
if intervalMs <= 0 {
intervalMs = 15000
}
// 根据 queryType 动态调整 URL 的路径部分
var apiPath string
if queryType == "instant" {
apiPath = "query"
} else {
apiPath = "query_range"
}
baseUrl := fmt.Sprintf("https://%s.%s/prometheus/%s/%s/api/v1/%s", project, endpoint, project, metricStore, apiPath)
from := query.TimeRange.From.Unix()
to := query.TimeRange.To.Unix()
queryStr := queryInfo.Query
log.DefaultLogger.Debug("getMetricLogs-intervalMs", "intervalMs", intervalMs)
step := (time.Duration(intervalMs) * time.Millisecond).String()
if queryInfo.Step != "" {
step = queryInfo.Step
}
urlVal := url.Values{}
var uri string
switch apiPath {
case "query":
urlVal.Add("query", queryStr)
urlVal.Add("time", strconv.FormatInt(to, 10))
uri = fmt.Sprintf("%s?%v", baseUrl, urlVal.Encode())
case "query_range":
urlVal.Add("query", queryStr)
urlVal.Add("start", strconv.FormatInt(from, 10))
urlVal.Add("end", strconv.FormatInt(to, 10))
// 判断 step 是不是空 则使用默认值
urlVal.Add("step", step)
uri = fmt.Sprintf("%s?%v", baseUrl, urlVal.Encode())
}
log.DefaultLogger.Debug("getMetricLogs-URL", "uri", uri)
// 创建 HTTP 请求
req, err := http.NewRequest(http.MethodGet, uri, nil)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(logSource.AccessKeyId, logSource.AccessKeySecret)
// 判断 headers 是否存在,存在则添加
if len(headers) > 0 {
for _, header := range headers {
req.Header.Set(header.Name, header.Value)
}
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.DefaultLogger.Error("getMetricLogs ", "queryInfo: ", queryInfo, "error ", err)
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) // 需要读完body内容。
if err != nil {
log.DefaultLogger.Debug("getMetricLogs-read-body-err", "err", err)
return err
}
// 拿到 body 中的 data
// 解析 JSON 数据到结构体
var metricLogs MetricLogs
if err := json.Unmarshal([]byte(body), &metricLogs); err != nil {
log.DefaultLogger.Debug("parse-body-err", "err", err)
return err
}
for _, metricData := range metricLogs.Data.Result {
var frame *data.Frame
if compatible {
frame = data.NewFrame("response")
} else {
frame = data.NewFrame("")
}
var times []time.Time = make([]time.Time, 0)
var values []float64 = make([]float64, 0)
if apiPath == "query_range" {
// 遍历result.values,将values中的数据添加到frame.Field
for _, value := range metricData.Values {
metricValue, err := strconv.ParseFloat(value[1].(string), 64)
if err != nil {
log.DefaultLogger.Debug("ParseFloat-metricData", "ParseFloat", err, "metricValue", metricValue)
}
times = append(times, time.Unix(int64(value[0].(float64)), 0))
values = append(values, metricValue)
}
} else {
var instantValue []interface{} = metricData.Value
metricValue, err := strconv.ParseFloat(instantValue[1].(string), 64)
if err != nil {
log.DefaultLogger.Debug("ParseFloat-metricData", "ParseFloat", err, "metricValue", metricValue)
}
times = append(times, time.Unix(int64(instantValue[0].(float64)), 0))
values = append(values, metricValue)
}
frame.Fields = append(frame.Fields, data.NewField("time", nil, times))
frame.Fields = append(frame.Fields, data.NewField("value", metricData.Metric, values).SetConfig(&data.FieldConfig{
DisplayNameFromDS: formatDisplayName(queryInfo.LegendFormat, metricData.Metric, queryInfo.Query),
}))
*frames = append(*frames, frame)
}
return nil
}
func (ds *SlsDatasource) BuildFlowGraphV2(logs []map[string]string, xcol string, ycols []string, frames *data.Frames) {
if len(logs) == 0 {
return
}
ds.SortLogs(logs, xcol)
metricNames := strings.Split(ycols[1], ",")
metricMap := make(map[string]bool)
for _, n := range metricNames {
metricMap[n] = true
}
var labelNames []string
for k := range logs[0] {
if k != "__source__" && k != "__time__" && !metricMap[k] && k != xcol {
labelNames = append(labelNames, k)
}
}
sort.Strings(labelNames)
timeFields := make(map[string][]time.Time)
nameMetricFields := make(map[string]map[string][]float64)
for _, n := range metricNames {
nameMetricFields[n] = make(map[string][]float64)
}
frameLabelsMap := make(map[string]map[string]string)
for _, alog := range logs {
timeVal := alog[xcol]
labels := map[string]string{}
labelsKey := ""
for _, l := range labelNames {
labels[l] = alog[l]
labelsKey += alog[l]
}
if _, ok := frameLabelsMap[labelsKey]; !ok {
frameLabelsMap[labelsKey] = labels
}
if _, ok := timeFields[labelsKey]; ok {
timeFields[labelsKey] = append(timeFields[labelsKey], toTime(timeVal))
} else {
timeFields[labelsKey] = []time.Time{toTime(timeVal)}
}
for _, n := range metricNames {
metricVal := alog[n]
floatV, err := strconv.ParseFloat(metricVal, 64)
if err != nil {
log.DefaultLogger.Debug("BuildFlowGraphV2", "ParseFloat", err, "value", metricVal)
}
if _, ok := nameMetricFields[n][labelsKey]; ok {
nameMetricFields[n][labelsKey] = append(nameMetricFields[n][labelsKey], floatV)
} else {
nameMetricFields[n][labelsKey] = []float64{floatV}
}
}
}
for k, v := range timeFields {
frame := data.NewFrame("")
frame.Fields = append(frame.Fields, data.NewField("Time", nil, v))
if len(metricNames) == 1 {
frame.Fields = append(frame.Fields, data.NewField("Value", frameLabelsMap[k], nameMetricFields[metricNames[0]][k]))
} else {
for _, n := range metricNames {
frame.Fields = append(frame.Fields, data.NewField(n, frameLabelsMap[k], nameMetricFields[n][k]))
}
}
*frames = append(*frames, frame)
}
}
func (ds *SlsDatasource) BuildFlowGraph(logs []map[string]string, xcol string, ycols []string, frames *data.Frames) (err error) {
if len(logs) == 0 {
frames = &data.Frames{}
return
}
if len(ycols) < 2 {
return
}
if ycols[0] == "" && ycols[1] != "" {
ds.BuildFlowGraphV2(logs, xcol, ycols, frames)
return
}
var frame *data.Frame
if compatible {
frame = data.NewFrame("response")
} else {
frame = data.NewFrame("")
}
fieldMap := make(map[int]map[int64]float64)
timeSet := make(map[int64]bool)
labelSet := make(map[string]bool)
var labelArr []string
for _, alog := range logs {
if !labelSet[alog[ycols[0]]] {
labelArr = append(labelArr, alog[ycols[0]])
}
timeSet[toTime(alog[xcol]).Unix()] = true
labelSet[alog[ycols[0]]] = true
}
var timeArr []int64
for k := range timeSet {
timeArr = append(timeArr, k)
}
sort.Slice(timeArr, func(i, j int) bool {
if timeArr[i] < timeArr[j] {
return true
}
return false
})
fieldSet := make(map[string]bool)
labelToIndex := make(map[string]int)
for i := range labelArr {
fieldMap[i] = map[int64]float64{}
labelToIndex[labelArr[i]] = i
}
if len(labelArr)*len(timeArr) > maxPointsLimit {
s := fmt.Sprintf("BuildFlowGraph, More than %d points : %d", maxPointsLimit, len(labelArr)*len(timeArr))
log.DefaultLogger.Error(s)
err = errors.New(s)
return
}
for i := range labelArr {
for _, t0 := range timeArr {
fieldMap[i][t0] = 0
}
}
for _, alog := range logs {
label := alog[ycols[0]]
t := alog[xcol]
if !fieldSet[t+label] {
fieldSet[t+label] = true
floatV, err1 := strconv.ParseFloat(alog[ycols[1]], 64)
if err1 != nil {
log.DefaultLogger.Debug("BuildFlowGraph", "ParseFloat", err, "value", alog[ycols[1]])
}
fieldMap[labelToIndex[label]][toTime(t).Unix()] = floatV
}
}
var frameLen int
for i, k := range labelArr {
v := fieldMap[i]
if len(v) > 0 {
if frameLen == 0 {
frameLen = len(v)
}
if len(v) == frameLen {
arr := mapToSlice(timeArr, v)
frame.Fields = append(frame.Fields, data.NewField(k, nil, arr))
}
}
}
var times []time.Time
for _, k := range timeArr {
times = append(times, time.Unix(k, 0))
}
if len(times) == frameLen {
frame.Fields = append([]*data.Field{data.NewField("time", nil, times)}, frame.Fields...)
}
*frames = append(*frames, frame)
return
}
func (ds *SlsDatasource) BuildBarGraph(logs []map[string]string, ycols []string, frames *data.Frames) {
frame := data.NewFrame("response")
numMap := make(map[string][]float64)
for _, ycol := range ycols[1:] {
numMap[ycol] = make([]float64, 0)
}
strKey := ycols[0]
var strArr []string
for _, alog := range logs {
for k, v := range alog {
if numMap[k] != nil {
floatV, err := strconv.ParseFloat(v, 64)
if err != nil {
log.DefaultLogger.Debug("BuildBarGraph", "ParseFloat", err, "value", v)
}
numMap[k] = append(numMap[k], floatV)
}
if k == strKey {
strArr = append(strArr, v)
}
}
}
frame.Fields = append(frame.Fields, data.NewField(strKey, nil, strArr))
for _, ycol := range ycols[1:] {
frame.Fields = append(frame.Fields, data.NewField(ycol, nil, numMap[ycol]))
}
*frames = append(*frames, frame)
}
func (ds *SlsDatasource) BuildMapGraph(logs []map[string]string, ycols []string, frames *data.Frames) {
frame := data.NewFrame("response")
strMap := make(map[string][]string)
for _, ycol := range ycols[:len(ycols)-1] {
strMap[ycol] = make([]string, 0)
}
numKey := ycols[len(ycols)-1]
var numArr []float64
for _, alog := range logs {
for k, v := range alog {
if strMap[k] != nil {
strMap[k] = append(strMap[k], v)
}
if k == numKey {
floatV, err := strconv.ParseFloat(v, 64)
if err != nil {
log.DefaultLogger.Debug("BuildMapGraph", "ParseFloat", err, "value", v)
}
numArr = append(numArr, floatV)
}
}
}
for k, v := range strMap {
frame.Fields = append(frame.Fields, data.NewField(k, nil, v))
}
frame.Fields = append(frame.Fields, data.NewField(numKey, nil, numArr))
*frames = append(*frames, frame)
}
func (ds *SlsDatasource) BuildPieGraph(logs []map[string]string, ycols []string, frames *data.Frames) {
if len(ycols) < 2 {
return
}
frame := data.NewFrame("response")
fieldMap := make(map[string][]float64)
var labelArr []string
for _, alog := range logs {
labelArr = append(labelArr, alog[ycols[0]])
}
for _, label := range labelArr {
exist := false
for _, alog := range logs {
if alog[ycols[0]] == label {
floatV, err := strconv.ParseFloat(alog[ycols[1]], 64)
if err != nil {
log.DefaultLogger.Debug("BuildPieGraph", "ParseFloat", err, "value", alog[ycols[1]])
}
fieldMap[label] = append(fieldMap[label], floatV)
exist = true
}
}
if !exist {
fieldMap[label] = append(fieldMap[label], 0)
}
}
for _, v := range labelArr {
frame.Fields = append(frame.Fields, data.NewField(v, nil, fieldMap[v]))
}
*frames = append(*frames, frame)
}
func (ds *SlsDatasource) BuildTimingGraph(logs []map[string]string, xcol string, ycols []string, keys []string, frames *data.Frames) {
ds.SortLogs(logs, xcol)
var frame *data.Frame
if compatible {
frame = data.NewFrame("response")
} else {
frame = data.NewFrame("")
}
fieldMap := make(map[string][]*float64)
var times []time.Time
if len(ycols) == 1 && ycols[0] == "" && len(keys) > 0 {
ycols = keys
}
for _, v := range ycols {
if v != xcol {
fieldMap[v] = make([]*float64, 0)
}
}
for _, alog := range logs {
for k, v := range alog {
if fieldMap[k] != nil {
// 判断一下这个v 是不是'null'
if v == "null" {
fieldMap[k] = append(fieldMap[k], nil)
} else {
floatV, err := strconv.ParseFloat(v, 64)
if err != nil {
log.DefaultLogger.Debug("BuildTimingGraph", "ParseFloat", err, "value", v)
}
fieldMap[k] = append(fieldMap[k], &floatV)
}
}
if xcol != "" && xcol == k {
times = append(times, toTime(v))
}
}
}
var frameLen int
for _, v := range fieldMap {
if len(v) > frameLen {
frameLen = len(v)
}
}
if len(times) == frameLen {
frame.Fields = append(frame.Fields, data.NewField("time", nil, times))
}
for _, v := range ycols {
if field, ok := fieldMap[v]; ok && len(field) == frameLen {
frame.Fields = append(frame.Fields, data.NewField(v, nil, field))
}
}
*frames = append(*frames, frame)
}
func (ds *SlsDatasource) BuildTable(logs []map[string]string, xcol string, ycols []string, keys []string, frames *data.Frames) {
frame := data.NewFrame(strings.Join(ycols, ","))
fieldMap := make(map[string][]string)
var keyArr []string
var times []time.Time
if len(ycols) == 1 && ycols[0] == "" && len(logs) > 0 {
ycols = ycols[:0]
if len(keys) > 0 {
ycols = append(ycols, keys...)
} else {
for k := range logs[0] {
if k != "__time__" && k != "__source__" {
ycols = append(ycols, k)
}
}
}
}
for _, ycol := range ycols {
fieldMap[ycol] = make([]string, 0)
keyArr = append(keyArr, ycol)
}
for _, alog := range logs {
for k, v := range alog {
if fieldMap[k] != nil {
fieldMap[k] = append(fieldMap[k], v)
}
if xcol != "" && xcol == k {
floatValue, err := strconv.ParseFloat(v, 64)
if err != nil {
log.DefaultLogger.Debug("BuildTable", "ParseTime", err)
continue
}
t := time.Unix(int64(floatValue), 0)
times = append(times, t)
}
}
}
for _, v := range keyArr {
frame.Fields = append(frame.Fields, data.NewField(v, nil, fieldMap[v]))
}
if len(times) > 0 {
frame.Fields = append(frame.Fields, data.NewField("time", nil, times))
}
*frames = append(*frames, frame)
}
func (ds *SlsDatasource) BuildLogs(logs []map[string]string, ycols []string, frames *data.Frames) {
frame := data.NewFrame("")
frame.Meta = &data.FrameMeta{
PreferredVisualization: data.VisTypeLogs,
}
fieldMap := make(map[string][]string)
var keyArr []string
var times []time.Time
if len(ycols) == 1 && ycols[0] == "" {
for _, alog := range logs {
for k := range alog {
if _, ok := fieldMap[k]; !ok {
fieldMap[k] = make([]string, 0)
keyArr = append(keyArr, k)
}
}
}
} else {
for _, ycol := range ycols {
fieldMap[ycol] = make([]string, 0)
keyArr = append(keyArr, ycol)
}
}
var values []string
for _, alog := range logs {
message := ""
for _, k := range keyArr {
fieldMap[k] = append(fieldMap[k], alog[k])
message = message + k + `="` + strings.ReplaceAll(alog[k], `"`, `'`) + `" `
}
timeValue, _ := strconv.ParseFloat(alog["__time__"], 64)
var t time.Time
if ns, ok := alog["__time_ns_part__"]; ok {
ns, _ := strconv.ParseInt(ns, 10, 64)
t = time.Unix(int64(timeValue), ns)
} else {
t = time.Unix(int64(timeValue), 0)
}
times = append(times, t)
values = append(values, message)
}
if len(times) > 0 {
frame.Fields = append(frame.Fields, data.NewField("time", nil, times))
}
frame.Fields = append(frame.Fields, data.NewField("message", nil, values))
for _, v := range keyArr {
frame.Fields = append(frame.Fields, data.NewField(v, nil, fieldMap[v]))
}
*frames = append(*frames, frame)
}
// BuildTrace
// *| select traceID,spanID,parentSpanID,service,host,resource,attribute,statusCode,statusMessage,logs,name,start,duration limit 100
func (ds *SlsDatasource) BuildTrace(logs []map[string]string, frames *data.Frames) {
frame := data.NewFrame("response")
frame.Meta = &data.FrameMeta{
PreferredVisualization: data.VisTypeTrace,
}
traceID := make([]string, 0)
spanID := make([]string, 0)
parentSpanID := make([]string, 0)
serviceName := make([]string, 0)
startTime := make([]float64, 0)
duration := make([]float64, 0)
resource := make([]string, 0)
host := make([]string, 0)
attribute := make([]string, 0)
statusCode := make([]string, 0)
statusMessage := make([]string, 0)
logs1 := make([]string, 0)
operationName := make([]string, 0)
for _, alog := range logs {
traceID = append(traceID, alog["traceID"])
spanID = append(spanID, alog["spanID"])
parentSpanID = append(parentSpanID, alog["parentSpanID"])
serviceName = append(serviceName, alog["service"])
host = append(host, alog["host"])
resource = append(resource, alog["resource"])
attribute = append(attribute, alog["attribute"])
statusCode = append(statusCode, alog["statusCode"])
statusMessage = append(statusMessage, alog["statusMessage"])
logs1 = append(logs1, alog["logs"])
operationName = append(operationName, alog["name"])
startTimeV, err := strconv.ParseFloat(alog["start"], 64)
if err != nil {
log.DefaultLogger.Debug("BuildTrace", "ParseFloat", err)
}
startTime = append(startTime, startTimeV/1000)
durationV, err := strconv.ParseFloat(alog["duration"], 64)
if err != nil {
log.DefaultLogger.Debug("BuildTrace", "ParseFloat", err)
}
duration = append(duration, durationV/1000)
}
frame.Fields = append(frame.Fields, data.NewField("operationName", nil, operationName))
frame.Fields = append(frame.Fields, data.NewField("traceID", nil, traceID))
frame.Fields = append(frame.Fields, data.NewField("spanID", nil, spanID))
frame.Fields = append(frame.Fields, data.NewField("parentSpanID", nil, parentSpanID))
frame.Fields = append(frame.Fields, data.NewField("serviceName", nil, serviceName))
frame.Fields = append(frame.Fields, data.NewField("startTime", nil, startTime))
frame.Fields = append(frame.Fields, data.NewField("duration", nil, duration))
frame.Fields = append(frame.Fields, data.NewField("resource", nil, resource))
frame.Fields = append(frame.Fields, data.NewField("host", nil, host))
frame.Fields = append(frame.Fields, data.NewField("attribute", nil, attribute))
frame.Fields = append(frame.Fields, data.NewField("statusCode", nil, statusCode))
frame.Fields = append(frame.Fields, data.NewField("statusMessage", nil, statusMessage))
frame.Fields = append(frame.Fields, data.NewField("logs", nil, logs1))
*frames = append(*frames, frame)
}
func mapToSlice(timeArr []int64, m map[int64]float64) []float64 {
s := make([]float64, 0, len(timeArr))
for _, v := range timeArr {
s = append(s, m[v])
}
return s
}
func toTime(sTime string) (t time.Time) {
if v, err := strconv.ParseFloat(sTime, 64); err == nil {
if len(sTime) == 13 {
t = time.Unix(int64(v)/1000, 0)
} else {
t = time.Unix(int64(v), 0)
}
return
}
re := regexp.MustCompile(`(\d{4})\S(\d{2})\S(\d{2})[\s\S](\d{2})\S(\d{2})\S(\d{2}).*`)
matched := re.FindAllStringSubmatch(sTime, -1)
if matched != nil {
s := fmt.Sprintf("%s-%s-%s %s:%s:%s", matched[0][1], matched[0][2], matched[0][3],
matched[0][4], matched[0][5], matched[0][6])
local, _ := time.LoadLocation("Asia/Shanghai")
t, _ = time.ParseInLocation("2006-01-02 15:04:05", s, local)
}
return
}
// 使用 labels 替换 format 中的占位符
func formatDisplayName(format string, labels map[string]string, query string) string {
re := regexp.MustCompile(`{{\s*([^}]+)\s*}}`)
matches := re.FindAllStringSubmatch(format, -1)
// 标志是否所有占位符都能匹配
hasMatched := false
for _, match := range matches {
placeholder := match[0]
labelName := match[1]
if value, ok := labels[labelName]; ok {
format = strings.ReplaceAll(format, placeholder, value)
hasMatched = true
}
}
if !hasMatched {
// 如果有任何一个占位符没有对应的值,返回 labels 的字符串表示
var parts []string
// 对 labels 排序后拼接
var keys []string
for key := range labels {
keys = append(keys, key)
}
// 如果 labels 为空 就直接用 query
if len(keys) == 0 {
return query
}
sort.Strings(keys) // 对 keys 进行排序
for _, key := range keys {
parts = append(parts, fmt.Sprintf(`%s="%s"`, key, labels[key]))
}
return fmt.Sprintf(`{ %s }`, strings.Join(parts, ", "))
}
return format
}