func()

in pkg/sls-plugin.go [240:448]


func (ds *SlsDatasource) QueryLogs(ch chan Result, query backend.DataQuery, client sls.ClientInterface, logSource *LogSource) {
	response := backend.DataResponse{}
	refId := query.RefID
	queryInfo := &QueryInfo{}

	defer func() {
		queryInfo = nil
		if r := recover(); r != nil {
			switch r.(type) {
			case string:
				response.Error = errors.New(r.(string))
			case error:
				response.Error = r.(error)
			}
			log.DefaultLogger.Error("QueryLogs recover", "refId", refId, "error", response.Error)
			ch <- Result{
				refId:        refId,
				dataResponse: response,
			}
		}
	}()

	err := json.Unmarshal(query.JSON, &queryInfo)
	if err != nil {
		log.DefaultLogger.Error("Unmarshal queryInfo", "refId", refId, "error", err)
		response.Error = err
		ch <- Result{
			refId:        refId,
			dataResponse: response,
		}
		return
	}
	xcol := queryInfo.Xcol

	from := query.TimeRange.From.Unix()
	to := query.TimeRange.To.Unix()

	var logStore string
	if queryInfo.LogStore != "" {
		logStore = queryInfo.LogStore
	} else {
		logStore = logSource.LogStore
	}

	// 如果 logstore 为空 返回错误
	if logStore == "" {
		response.Error = errors.New("logStore is empty, please select a logstore")
		ch <- Result{
			refId:        refId,
			dataResponse: response,
		}
		return
	}

	// 如果是 metric 类型 直连 Prometheus 获取数据
	if queryInfo.Type == "metricstore" {
		var metricFrames data.Frames
		err := ds.getMetricLogs(ch, query, queryInfo, logSource, response, &metricFrames)

		if err != nil {
			response.Error = err
			ch <- Result{
				refId:        refId,
				dataResponse: response,
			}
		} else {
			response.Frames = metricFrames
			ch <- Result{
				refId:        refId,
				dataResponse: response,
			}
		}
		return
	}

	var ycols []string
	queryCount := ds.GetQueryCount(queryInfo)
	getLogsResp := &sls.GetLogsResponse{}
	for i := 0; i < int(queryCount); i++ {
		currentPage := queryInfo.CurrentPage
		if currentPage <= 0 {
			currentPage = 1
		}
		offset := (currentPage - 1) * queryInfo.LogsPerPage
		getLogsReq := &sls.GetLogRequest{
			From:     from,
			To:       to,
			Query:    queryInfo.Query,
			Lines:    queryInfo.LogsPerPage,
			Offset:   offset,
			Reverse:  true,
			PowerSQL: queryInfo.PowerSql,
		}
		tem, err := client.GetLogsV2(logSource.Project, logStore, getLogsReq)
		if err != nil {
			log.DefaultLogger.Error("GetLogs ", "query : ", queryInfo.Query, "error ", err)
			response.Error = err
			ch <- Result{
				refId:        refId,
				dataResponse: response,
			}
			return
		}
		if i == 0 {
			getLogsResp = tem
		} else {
			getLogsResp.Logs = append(getLogsResp.Logs, tem.Logs...)
			getLogsResp.Count = getLogsResp.Count + tem.Count
		}
		if tem.Count < queryInfo.LogsPerPage {
			break
		}
		queryInfo.CurrentPage++
	}

	logs := getLogsResp.Logs
	c := &Contents{}
	err = json.Unmarshal([]byte(getLogsResp.Contents), &c)
	if err != nil {
		log.DefaultLogger.Error("GetLogs ", "Contents : ", getLogsResp.Contents, "error ", err)
		response.Error = err
		ch <- Result{
			refId:        refId,
			dataResponse: response,
		}
		return
	}
	if getLogsResp.Progress != "Complete" {
		log.DefaultLogger.Warn("GetLogs ", "Progress: ", getLogsResp.Progress, "warn: ", "incomplete logs, you can switch to powerSql")
		response.Error = errors.New("incomplete logs, you can switch to powerSql")
	}

	keys := c.Keys
	if compatible {
		queryInfo.Ycol = strings.Replace(queryInfo.Ycol, " ", "", -1)
	}

	isFlowGraph := strings.Contains(queryInfo.Ycol, "#:#")
	if isFlowGraph {
		ycols = strings.Split(queryInfo.Ycol, "#:#")
	} else {
		ycols = strings.Split(queryInfo.Ycol, ",")
	}
	if !compatible {
		for i := range ycols {
			ycols[i] = strings.TrimSpace(ycols[i])
		}
	}
	log.DefaultLogger.Debug("QueryLogs", "getLogsResp", getLogsResp)
	getLogsResp = nil
	var frames data.Frames

	if xcol == "trace" {
		log.DefaultLogger.Debug("BuildTrace")
		ds.BuildTrace(logs, &frames)
		response.Frames = frames
		ch <- Result{
			refId:        refId,
			dataResponse: response,
		}
		return
	}
	if !strings.Contains(queryInfo.Query, "|") {
		log.DefaultLogger.Debug("BuildLogs")
		ds.BuildLogs(logs, ycols, &frames)
		response.Frames = frames
		ch <- Result{
			refId:        refId,
			dataResponse: response,
		}
		return
	}

	if isFlowGraph {
		log.DefaultLogger.Debug("flow_graph")
		err = ds.BuildFlowGraph(logs, xcol, ycols, &frames)
		if err != nil {
			response.Error = err
		} else {
			response.Frames = frames
		}
		ch <- Result{
			refId:        refId,
			dataResponse: response,
		}
		return
	}
	if xcol == "bar" {
		log.DefaultLogger.Debug("bar")
		ds.BuildBarGraph(logs, ycols, &frames)
	} else if xcol == "map" {
		log.DefaultLogger.Debug("map")
		ds.BuildMapGraph(logs, ycols, &frames)
	} else if xcol == "pie" {
		log.DefaultLogger.Debug("pie")
		ds.BuildPieGraph(logs, ycols, &frames)
	} else if xcol != "" && xcol != "map" && xcol != "pie" && xcol != "bar" && xcol != "table" {
		log.DefaultLogger.Debug("time_graph")
		ds.BuildTimingGraph(logs, xcol, ycols, keys, &frames)
	} else {
		log.DefaultLogger.Debug("table")
		ds.BuildTable(logs, xcol, ycols, keys, &frames)
	}
	response.Frames = frames
	ch <- Result{
		refId:        refId,
		dataResponse: response,
	}
}