in log_store.go [529:627]
func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) ([]byte, *PullLogMeta, error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/x-protobuf",
}
if plr.CompressType < 0 || Compress_LZ4 >= Compress_Max {
return nil, nil, fmt.Errorf("unsupported compress type: %d", plr.CompressType)
}
if plr.CompressType == Compress_ZSTD {
h["Accept-Encoding"] = "zstd"
} else {
h["Accept-Encoding"] = "lz4"
}
urlVal := plr.ToURLParams()
uri := fmt.Sprintf("/logstores/%v/shards/%v?%s", s.Name, plr.ShardID, urlVal.Encode())
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return nil, nil, err
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, nil, err
}
if r.StatusCode != http.StatusOK {
errMsg := &Error{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
dump, _ := httputil.DumpResponse(r, true)
if IsDebugLevelMatched(1) {
level.Error(Logger).Log("msg", string(dump))
}
return nil, nil, fmt.Errorf("failed parse errorCode json: %w", err)
}
return nil, nil, fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
}
netflow := len(buf)
nextCursor, err := parseHeaderString(r.Header, "X-Log-Cursor")
if err != nil {
return nil, nil, err
}
rawSize, err := ParseHeaderInt(r, "X-Log-Bodyrawsize")
if err != nil {
return nil, nil, err
}
count, err := ParseHeaderInt(r, "X-Log-Count")
if err != nil {
return nil, nil, err
}
readLastCursor, _ := parseHeaderString(r.Header, "X-Log-Read-Last-Cursor")
pullMeta := &PullLogMeta{
RawSize: rawSize,
NextCursor: nextCursor,
Netflow: netflow,
Count: count,
readLastCursor: readLastCursor,
}
// If query is not nil, extract more headers
if plr.Query != "" {
pullMeta.RawSizeBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatasize")
pullMeta.DataCountBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatacount")
pullMeta.Lines, _ = ParseHeaderInt(r, "X-Log-Resultlines")
pullMeta.LinesBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatalines")
pullMeta.FailedLines, _ = ParseHeaderInt(r, "X-Log-Failedlines")
}
if rawSize == 0 {
return make([]byte, 0), pullMeta, nil
}
// decompress data
out := make([]byte, rawSize)
compressType, err := parseHeaderString(r.Header, "X-Log-Compresstype")
if err != nil {
return nil, nil, err
}
switch compressType {
case "lz4":
uncompressedSize := 0
if uncompressedSize, err = lz4.UncompressBlock(buf, out); err != nil {
return nil, nil, err
}
if uncompressedSize != rawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, rawSize)
}
case "zstd":
out, err = slsZstdCompressor.Decompress(buf, out)
if err != nil {
return nil, nil, err
}
if len(out) != rawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", len(out), rawSize)
}
default:
return nil, nil, fmt.Errorf("unexpected compress type: %s", compressType)
}
return out, pullMeta, nil
}