in pkg/sls-plugin.go [240:448]
func (ds *SlsDatasource) QueryLogs(ch chan Result, query backend.DataQuery, client sls.ClientInterface, logSource *LogSource) {
response := backend.DataResponse{}
refId := query.RefID
queryInfo := &QueryInfo{}
defer func() {
queryInfo = nil
if r := recover(); r != nil {
switch r.(type) {
case string:
response.Error = errors.New(r.(string))
case error:
response.Error = r.(error)
}
log.DefaultLogger.Error("QueryLogs recover", "refId", refId, "error", response.Error)
ch <- Result{
refId: refId,
dataResponse: response,
}
}
}()
err := json.Unmarshal(query.JSON, &queryInfo)
if err != nil {
log.DefaultLogger.Error("Unmarshal queryInfo", "refId", refId, "error", err)
response.Error = err
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
xcol := queryInfo.Xcol
from := query.TimeRange.From.Unix()
to := query.TimeRange.To.Unix()
var logStore string
if queryInfo.LogStore != "" {
logStore = queryInfo.LogStore
} else {
logStore = logSource.LogStore
}
// 如果 logstore 为空 返回错误
if logStore == "" {
response.Error = errors.New("logStore is empty, please select a logstore")
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
// 如果是 metric 类型 直连 Prometheus 获取数据
if queryInfo.Type == "metricstore" {
var metricFrames data.Frames
err := ds.getMetricLogs(ch, query, queryInfo, logSource, response, &metricFrames)
if err != nil {
response.Error = err
ch <- Result{
refId: refId,
dataResponse: response,
}
} else {
response.Frames = metricFrames
ch <- Result{
refId: refId,
dataResponse: response,
}
}
return
}
var ycols []string
queryCount := ds.GetQueryCount(queryInfo)
getLogsResp := &sls.GetLogsResponse{}
for i := 0; i < int(queryCount); i++ {
currentPage := queryInfo.CurrentPage
if currentPage <= 0 {
currentPage = 1
}
offset := (currentPage - 1) * queryInfo.LogsPerPage
getLogsReq := &sls.GetLogRequest{
From: from,
To: to,
Query: queryInfo.Query,
Lines: queryInfo.LogsPerPage,
Offset: offset,
Reverse: true,
PowerSQL: queryInfo.PowerSql,
}
tem, err := client.GetLogsV2(logSource.Project, logStore, getLogsReq)
if err != nil {
log.DefaultLogger.Error("GetLogs ", "query : ", queryInfo.Query, "error ", err)
response.Error = err
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
if i == 0 {
getLogsResp = tem
} else {
getLogsResp.Logs = append(getLogsResp.Logs, tem.Logs...)
getLogsResp.Count = getLogsResp.Count + tem.Count
}
if tem.Count < queryInfo.LogsPerPage {
break
}
queryInfo.CurrentPage++
}
logs := getLogsResp.Logs
c := &Contents{}
err = json.Unmarshal([]byte(getLogsResp.Contents), &c)
if err != nil {
log.DefaultLogger.Error("GetLogs ", "Contents : ", getLogsResp.Contents, "error ", err)
response.Error = err
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
if getLogsResp.Progress != "Complete" {
log.DefaultLogger.Warn("GetLogs ", "Progress: ", getLogsResp.Progress, "warn: ", "incomplete logs, you can switch to powerSql")
response.Error = errors.New("incomplete logs, you can switch to powerSql")
}
keys := c.Keys
if compatible {
queryInfo.Ycol = strings.Replace(queryInfo.Ycol, " ", "", -1)
}
isFlowGraph := strings.Contains(queryInfo.Ycol, "#:#")
if isFlowGraph {
ycols = strings.Split(queryInfo.Ycol, "#:#")
} else {
ycols = strings.Split(queryInfo.Ycol, ",")
}
if !compatible {
for i := range ycols {
ycols[i] = strings.TrimSpace(ycols[i])
}
}
log.DefaultLogger.Debug("QueryLogs", "getLogsResp", getLogsResp)
getLogsResp = nil
var frames data.Frames
if xcol == "trace" {
log.DefaultLogger.Debug("BuildTrace")
ds.BuildTrace(logs, &frames)
response.Frames = frames
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
if !strings.Contains(queryInfo.Query, "|") {
log.DefaultLogger.Debug("BuildLogs")
ds.BuildLogs(logs, ycols, &frames)
response.Frames = frames
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
if isFlowGraph {
log.DefaultLogger.Debug("flow_graph")
err = ds.BuildFlowGraph(logs, xcol, ycols, &frames)
if err != nil {
response.Error = err
} else {
response.Frames = frames
}
ch <- Result{
refId: refId,
dataResponse: response,
}
return
}
if xcol == "bar" {
log.DefaultLogger.Debug("bar")
ds.BuildBarGraph(logs, ycols, &frames)
} else if xcol == "map" {
log.DefaultLogger.Debug("map")
ds.BuildMapGraph(logs, ycols, &frames)
} else if xcol == "pie" {
log.DefaultLogger.Debug("pie")
ds.BuildPieGraph(logs, ycols, &frames)
} else if xcol != "" && xcol != "map" && xcol != "pie" && xcol != "bar" && xcol != "table" {
log.DefaultLogger.Debug("time_graph")
ds.BuildTimingGraph(logs, xcol, ycols, keys, &frames)
} else {
log.DefaultLogger.Debug("table")
ds.BuildTable(logs, xcol, ycols, keys, &frames)
}
response.Frames = frames
ch <- Result{
refId: refId,
dataResponse: response,
}
}