func()

in log_store.go [529:627]


func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) ([]byte, *PullLogMeta, error) {
	h := map[string]string{
		"x-log-bodyrawsize": "0",
		"Accept":            "application/x-protobuf",
	}
	if plr.CompressType < 0 || Compress_LZ4 >= Compress_Max {
		return nil, nil, fmt.Errorf("unsupported compress type: %d", plr.CompressType)
	}
	if plr.CompressType == Compress_ZSTD {
		h["Accept-Encoding"] = "zstd"
	} else {
		h["Accept-Encoding"] = "lz4"
	}
	urlVal := plr.ToURLParams()
	uri := fmt.Sprintf("/logstores/%v/shards/%v?%s", s.Name, plr.ShardID, urlVal.Encode())

	r, err := request(s.project, "GET", uri, h, nil)
	if err != nil {
		return nil, nil, err
	}
	defer r.Body.Close()
	buf, err := ioutil.ReadAll(r.Body)
	if err != nil {
		return nil, nil, err
	}
	if r.StatusCode != http.StatusOK {
		errMsg := &Error{}
		err = json.Unmarshal(buf, errMsg)
		if err != nil {
			dump, _ := httputil.DumpResponse(r, true)
			if IsDebugLevelMatched(1) {
				level.Error(Logger).Log("msg", string(dump))
			}
			return nil, nil, fmt.Errorf("failed parse errorCode json: %w", err)
		}
		return nil, nil, fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
	}
	netflow := len(buf)

	nextCursor, err := parseHeaderString(r.Header, "X-Log-Cursor")
	if err != nil {
		return nil, nil, err
	}
	rawSize, err := ParseHeaderInt(r, "X-Log-Bodyrawsize")
	if err != nil {
		return nil, nil, err
	}
	count, err := ParseHeaderInt(r, "X-Log-Count")
	if err != nil {
		return nil, nil, err
	}
	readLastCursor, _ := parseHeaderString(r.Header, "X-Log-Read-Last-Cursor")
	pullMeta := &PullLogMeta{
		RawSize:        rawSize,
		NextCursor:     nextCursor,
		Netflow:        netflow,
		Count:          count,
		readLastCursor: readLastCursor,
	}
	// If query is not nil, extract more headers
	if plr.Query != "" {
		pullMeta.RawSizeBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatasize")
		pullMeta.DataCountBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatacount")
		pullMeta.Lines, _ = ParseHeaderInt(r, "X-Log-Resultlines")
		pullMeta.LinesBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatalines")
		pullMeta.FailedLines, _ = ParseHeaderInt(r, "X-Log-Failedlines")
	}
	if rawSize == 0 {
		return make([]byte, 0), pullMeta, nil
	}

	// decompress data
	out := make([]byte, rawSize)
	compressType, err := parseHeaderString(r.Header, "X-Log-Compresstype")
	if err != nil {
		return nil, nil, err
	}
	switch compressType {
	case "lz4":
		uncompressedSize := 0
		if uncompressedSize, err = lz4.UncompressBlock(buf, out); err != nil {
			return nil, nil, err
		}
		if uncompressedSize != rawSize {
			return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, rawSize)
		}
	case "zstd":
		out, err = slsZstdCompressor.Decompress(buf, out)
		if err != nil {
			return nil, nil, err
		}
		if len(out) != rawSize {
			return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", len(out), rawSize)
		}
	default:
		return nil, nil, fmt.Errorf("unexpected compress type: %s", compressType)
	}
	return out, pullMeta, nil
}