log_store.go (1,079 lines of code) (raw):
package sls
import (
"encoding/json"
"fmt"
"time"
"io/ioutil"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pierrec/lz4/v4"
)
// this file is deprecated and no maintenance
// see client_logstore.go
// LogStore defines LogStore struct
type LogStore struct {
Name string `json:"logstoreName"`
TTL int `json:"ttl"`
ShardCount int `json:"shardCount"`
WebTracking bool `json:"enable_tracking"`
AutoSplit bool `json:"autoSplit"`
MaxSplitShard int `json:"maxSplitShard"`
AppendMeta bool `json:"appendMeta"`
TelemetryType string `json:"telemetryType"`
HotTTL int32 `json:"hot_ttl,omitempty"`
Mode string `json:"mode,omitempty"` // "query" or "standard"(default), can't be modified after creation
CreateTime uint32 `json:"createTime,omitempty"`
LastModifyTime uint32 `json:"lastModifyTime,omitempty"`
project *LogProject
putLogCompressType int
EncryptConf *EncryptConf `json:"encrypt_conf,omitempty"`
ProductType string `json:"productType,omitempty"`
useMetricStoreURL bool
}
// Shard defines shard struct
type Shard struct {
ShardID int `json:"shardID"`
Status string `json:"status"`
InclusiveBeginKey string `json:"inclusiveBeginKey"`
ExclusiveBeginKey string `json:"exclusiveEndKey"`
CreateTime int `json:"createTime"`
}
// encrypt struct
type EncryptConf struct {
Enable bool `json:"enable"`
EncryptType string `json:"encrypt_type"`
UserCmkInfo *EncryptUserCmkConf `json:"user_cmk_info,omitempty"`
}
// EncryptUserCmkConf struct
type EncryptUserCmkConf struct {
CmkKeyId string `json:"cmk_key_id"`
Arn string `json:"arn"`
RegionId string `json:"region_id"`
}
// NewLogStore ...
func NewLogStore(logStoreName string, project *LogProject) (*LogStore, error) {
return &LogStore{
Name: logStoreName,
project: project,
}, nil
}
// SetPutLogCompressType set put log's compress type, default lz4
func (s *LogStore) SetPutLogCompressType(compressType int) error {
if compressType < 0 || compressType >= Compress_Max {
return InvalidCompressError
}
s.putLogCompressType = compressType
return nil
}
// ListShards returns shard id list of this logstore.
func (s *LogStore) ListShards() (shardIDs []*Shard, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
}
uri := fmt.Sprintf("/logstores/%v/shards", s.Name)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return nil, NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, readResponseError(err)
}
if r.StatusCode != http.StatusOK {
err := &Error{}
if jErr := json.Unmarshal(buf, err); jErr != nil {
return nil, NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
return nil, err
}
var shards []*Shard
err = json.Unmarshal(buf, &shards)
if err != nil {
return nil, NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
return shards, nil
}
func copyIncompressible(src, dst []byte) (int, error) {
lLen, dn := len(src), len(dst)
di := 0
if lLen < 0xF {
dst[di] = byte(lLen << 4)
} else {
dst[di] = 0xF0
if di++; di == dn {
return di, nil
}
lLen -= 0xF
for ; lLen >= 0xFF; lLen -= 0xFF {
dst[di] = 0xFF
if di++; di == dn {
return di, nil
}
}
dst[di] = byte(lLen)
}
if di++; di+len(src) > dn {
return di, nil
}
di += copy(dst[di:], src)
return di, nil
}
// PutRawLog put raw log data to log service, no marshal
func (s *LogStore) PutRawLog(rawLogData []byte) (err error) {
if len(rawLogData) == 0 {
// empty log group
return nil
}
var out []byte
var h map[string]string
var outLen int
switch s.putLogCompressType {
case Compress_LZ4:
// Compresse body with lz4
out = make([]byte, lz4.CompressBlockBound(len(rawLogData)))
var hashTable [1 << 16]int
n, err := lz4.CompressBlock(rawLogData, out, hashTable[:])
if err != nil {
return NewClientError(err)
}
// copy incompressible data as lz4 format
if n == 0 {
n, _ = copyIncompressible(rawLogData, out)
}
h = map[string]string{
"x-log-compresstype": "lz4",
"x-log-bodyrawsize": strconv.Itoa(len(rawLogData)),
"Content-Type": "application/x-protobuf",
}
outLen = n
case Compress_ZSTD:
out, _ = slsZstdCompressor.Compress(rawLogData, nil)
h = map[string]string{
"x-log-compresstype": "zstd",
"x-log-bodyrawsize": strconv.Itoa(len(rawLogData)),
"Content-Type": "application/x-protobuf",
}
outLen = len(out)
case Compress_None:
// no compress
out = rawLogData
h = map[string]string{
"x-log-bodyrawsize": strconv.Itoa(len(rawLogData)),
"Content-Type": "application/x-protobuf",
}
outLen = len(out)
}
uri := fmt.Sprintf("/logstores/%v", s.Name)
r, err := request(s.project, "POST", uri, h, out[:outLen])
if err != nil {
return NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return readResponseError(err)
}
if r.StatusCode != http.StatusOK {
err := new(Error)
if jErr := json.Unmarshal(buf, err); jErr != nil {
return NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
return err
}
return nil
}
func (s *LogStore) PostRawLogs(body []byte, hashKey *string) (err error) {
if len(body) == 0 {
// empty log group or empty hashkey
return nil
}
if hashKey == nil || *hashKey == "" {
// empty hash call PutLogs
return s.PutRawLog(body)
}
var out []byte
var h map[string]string
var outLen int
switch s.putLogCompressType {
case Compress_LZ4:
// Compresse body with lz4
out = make([]byte, lz4.CompressBlockBound(len(body)))
var hashTable [1 << 16]int
n, err := lz4.CompressBlock(body, out, hashTable[:])
if err != nil {
return NewClientError(err)
}
// copy incompressible data as lz4 format
if n == 0 {
n, _ = copyIncompressible(body, out)
}
h = map[string]string{
"x-log-compresstype": "lz4",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = n
case Compress_ZSTD:
// Compress body with zstd
out, _ = slsZstdCompressor.Compress(body, nil)
h = map[string]string{
"x-log-compresstype": "zstd",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = len(out)
case Compress_None:
// no compress
out = body
h = map[string]string{
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = len(out)
}
uri := fmt.Sprintf("/logstores/%v/shards/route?key=%v", s.Name, *hashKey)
r, err := request(s.project, "POST", uri, h, out[:outLen])
if err != nil {
return NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return readResponseError(err)
}
if r.StatusCode != http.StatusOK {
err := new(Error)
if jErr := json.Unmarshal(buf, err); jErr != nil {
return NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
return err
}
return nil
}
// PutLogs put logs into logstore.
// The callers should transform user logs into LogGroup.
func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
if len(lg.Logs) == 0 {
// empty log group
return nil
}
body, err := proto.Marshal(lg)
if err != nil {
return NewClientError(err)
}
var out []byte
var h map[string]string
var outLen int
switch s.putLogCompressType {
case Compress_LZ4:
// Compresse body with lz4
out = make([]byte, lz4.CompressBlockBound(len(body)))
var hashTable [1 << 16]int
n, err := lz4.CompressBlock(body, out, hashTable[:])
if err != nil {
return NewClientError(err)
}
// copy incompressible data as lz4 format
if n == 0 {
n, _ = copyIncompressible(body, out)
}
h = map[string]string{
"x-log-compresstype": "lz4",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = n
case Compress_ZSTD:
// Compress body with zstd
out, _ = slsZstdCompressor.Compress(body, nil)
h = map[string]string{
"x-log-compresstype": "zstd",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = len(out)
case Compress_None:
// no compress
out = body
h = map[string]string{
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = len(out)
}
var uri string
if s.useMetricStoreURL {
uri = fmt.Sprintf("/prometheus/%s/%s/api/v1/write", s.project.Name, s.Name)
} else {
uri = fmt.Sprintf("/logstores/%v", s.Name)
}
r, err := request(s.project, "POST", uri, h, out[:outLen])
if err != nil {
return NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return readResponseError(err)
}
if r.StatusCode != http.StatusOK {
err := new(Error)
if jErr := json.Unmarshal(buf, err); jErr != nil {
return NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
return err
}
return nil
}
// PostLogStoreLogs put logs into Shard logstore by hashKey.
// The callers should transform user logs into LogGroup.
func (s *LogStore) PostLogStoreLogs(req *PostLogStoreLogsRequest) (err error) {
if err = s.SetPutLogCompressType(req.CompressType); err != nil {
return err
}
if req.LogGroup == nil || len(req.LogGroup.Logs) == 0 {
// empty log group or empty hashkey
return nil
}
if s.useMetricStoreURL {
return s.PutLogs(req.LogGroup)
}
body, err := proto.Marshal(req.LogGroup)
if err != nil {
return NewClientError(err)
}
var out []byte
var h map[string]string
var outLen int
switch s.putLogCompressType {
case Compress_LZ4:
// Compresse body with lz4
out = make([]byte, lz4.CompressBlockBound(len(body)))
var hashTable [1 << 16]int
n, err := lz4.CompressBlock(body, out, hashTable[:])
if err != nil {
return NewClientError(err)
}
// copy incompressible data as lz4 format
if n == 0 {
n, _ = copyIncompressible(body, out)
}
h = map[string]string{
"x-log-compresstype": "lz4",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = n
case Compress_ZSTD:
// Compress body with zstd
out, _ = slsZstdCompressor.Compress(body, nil)
h = map[string]string{
"x-log-compresstype": "zstd",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = len(out)
case Compress_None:
// no compress
out = body
h = map[string]string{
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = len(out)
}
var uri = fmt.Sprintf("/logstores/%s", s.Name)
var params = url.Values{}
if req.HashKey != nil && *req.HashKey != "" {
params.Set("key", *req.HashKey)
uri = fmt.Sprintf("/logstores/%s/shards/route", s.Name)
}
if req.Processor != "" {
params.Set("processor", req.Processor)
}
if len(params) > 0 {
uri = fmt.Sprintf("%s?%s", uri, params.Encode())
}
r, err := request(s.project, "POST", uri, h, out[:outLen])
if err != nil {
return NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return readResponseError(err)
}
if r.StatusCode != http.StatusOK {
err := new(Error)
if jErr := json.Unmarshal(buf, err); jErr != nil {
return NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
return err
}
return nil
}
// GetCursor gets log cursor of one shard specified by shardId.
// The from can be in three form: a) unix timestamp in seccond, b) "begin", c) "end".
// For more detail please read: https://help.aliyun.com/document_detail/29024.html
func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
}
uri := fmt.Sprintf("/logstores/%v/shards/%v?type=cursor&from=%v",
s.Name, shardID, from)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return "", err
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return "", err
}
if r.StatusCode != http.StatusOK {
errMsg := &Error{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get cursor")
dump, _ := httputil.DumpResponse(r, true)
if IsDebugLevelMatched(1) {
level.Error(Logger).Log("msg", string(dump))
}
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
type Body struct {
Cursor string
}
body := &Body{}
err = json.Unmarshal(buf, body)
if err != nil {
return "", NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
cursor = body.Cursor
return cursor, nil
}
func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
plr := &PullLogRequest{
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return s.GetLogsBytesV2(plr)
}
// Deprecated: use GetLogsBytesWithQuery instead
func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) ([]byte, string, error) {
out, plm, err := s.GetLogsBytesWithQuery(plr)
if err != nil {
return nil, "", err
}
return out, plm.NextCursor, err
}
// GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) ([]byte, *PullLogMeta, error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/x-protobuf",
}
if plr.CompressType < 0 || Compress_LZ4 >= Compress_Max {
return nil, nil, fmt.Errorf("unsupported compress type: %d", plr.CompressType)
}
if plr.CompressType == Compress_ZSTD {
h["Accept-Encoding"] = "zstd"
} else {
h["Accept-Encoding"] = "lz4"
}
urlVal := plr.ToURLParams()
uri := fmt.Sprintf("/logstores/%v/shards/%v?%s", s.Name, plr.ShardID, urlVal.Encode())
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return nil, nil, err
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, nil, err
}
if r.StatusCode != http.StatusOK {
errMsg := &Error{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
dump, _ := httputil.DumpResponse(r, true)
if IsDebugLevelMatched(1) {
level.Error(Logger).Log("msg", string(dump))
}
return nil, nil, fmt.Errorf("failed parse errorCode json: %w", err)
}
return nil, nil, fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
}
netflow := len(buf)
nextCursor, err := parseHeaderString(r.Header, "X-Log-Cursor")
if err != nil {
return nil, nil, err
}
rawSize, err := ParseHeaderInt(r, "X-Log-Bodyrawsize")
if err != nil {
return nil, nil, err
}
count, err := ParseHeaderInt(r, "X-Log-Count")
if err != nil {
return nil, nil, err
}
readLastCursor, _ := parseHeaderString(r.Header, "X-Log-Read-Last-Cursor")
pullMeta := &PullLogMeta{
RawSize: rawSize,
NextCursor: nextCursor,
Netflow: netflow,
Count: count,
readLastCursor: readLastCursor,
}
// If query is not nil, extract more headers
if plr.Query != "" {
pullMeta.RawSizeBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatasize")
pullMeta.DataCountBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatacount")
pullMeta.Lines, _ = ParseHeaderInt(r, "X-Log-Resultlines")
pullMeta.LinesBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatalines")
pullMeta.FailedLines, _ = ParseHeaderInt(r, "X-Log-Failedlines")
}
if rawSize == 0 {
return make([]byte, 0), pullMeta, nil
}
// decompress data
out := make([]byte, rawSize)
compressType, err := parseHeaderString(r.Header, "X-Log-Compresstype")
if err != nil {
return nil, nil, err
}
switch compressType {
case "lz4":
uncompressedSize := 0
if uncompressedSize, err = lz4.UncompressBlock(buf, out); err != nil {
return nil, nil, err
}
if uncompressedSize != rawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, rawSize)
}
case "zstd":
out, err = slsZstdCompressor.Decompress(buf, out)
if err != nil {
return nil, nil, err
}
if len(out) != rawSize {
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", len(out), rawSize)
}
default:
return nil, nil, fmt.Errorf("unexpected compress type: %s", compressType)
}
return out, pullMeta, nil
}
// LogsBytesDecode decodes logs binary data returned by GetLogsBytes API
func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {
gl = &LogGroupList{}
err = proto.Unmarshal(data, gl)
if err != nil {
return nil, err
}
return gl, nil
}
// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next cursor can be used to read logs at next time.
// @note if you want to pull logs continuous, set endCursor = ""
func (s *LogStore) PullLogs(shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
plr := &PullLogRequest{
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return s.PullLogsV2(plr)
}
// Deprecated: use PullLogsWithQuery instead
func (s *LogStore) PullLogsV2(plr *PullLogRequest) (*LogGroupList, string, error) {
gl, plm, err := s.PullLogsWithQuery(plr)
if err != nil {
return nil, "", err
}
return gl, plm.NextCursor, err
}
func (s *LogStore) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) {
out, plm, err := s.GetLogsBytesWithQuery(plr)
if err != nil {
return nil, nil, err
}
gl, err = LogsBytesDecode(out)
if err != nil {
return nil, nil, err
}
if plm.Count > 0 && plm.readLastCursor != "" && plr.Query == "" {
gl.addCursorIfPossible(plm.readLastCursor)
}
return
}
// GetHistograms query logs with [from, to) time range
func (s *LogStore) GetHistograms(topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {
return s.GetHistogramsV2(&GetHistogramRequest{
Topic: topic,
From: from,
To: to,
Query: queryExp,
})
}
func (s *LogStore) GetHistogramsV2(ghr *GetHistogramRequest) (*GetHistogramsResponse, error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/json",
}
urlVal := ghr.ToURLParams()
uri := fmt.Sprintf("/logstores/%s?%s", s.Name, urlVal.Encode())
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return nil, NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, readResponseError(err)
}
if r.StatusCode != http.StatusOK {
err := new(Error)
if jErr := json.Unmarshal(buf, err); jErr != nil {
return nil, NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
return nil, err
}
histograms := []SingleHistogram{}
err = json.Unmarshal(buf, &histograms)
if err != nil {
return nil, NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
count, err := strconv.ParseInt(r.Header.Get(GetLogsCountHeader), 10, 64)
if err != nil {
return nil, err
}
getHistogramsResponse := GetHistogramsResponse{
Progress: r.Header[ProgressHeader][0],
Count: count,
Histograms: histograms,
}
return &getHistogramsResponse, nil
}
// GetLogLines query logs with [from, to) time range
func (s *LogStore) GetLogLines(topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogLinesResponse, error) {
var req GetLogRequest
req.Topic = topic
req.From = from
req.To = to
req.Query = queryExp
req.Lines = maxLineNum
req.Offset = offset
req.Reverse = reverse
return s.GetLogLinesV2(&req)
}
// GetLogLinesByNano query logs with [fromInNS, toInNs) nano time range
func (s *LogStore) GetLogLinesByNano(topic string, fromInNS int64, toInNs int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogLinesResponse, error) {
var req GetLogRequest
req.Topic = topic
req.From = fromInNS / 1e9
req.To = toInNs / 1e9
req.FromNsPart = int32(fromInNS % 1e9)
req.ToNsPart = int32(toInNs % 1e9)
req.Query = queryExp
req.Lines = maxLineNum
req.Offset = offset
req.Reverse = reverse
return s.GetLogLinesV2(&req)
}
// GetLogLinesV2 query logs with [from, to) time range
func (s *LogStore) GetLogLinesV2(req *GetLogRequest) (*GetLogLinesResponse, error) {
v3Rsp, httpRsp, err := s.getLogsV3Internal(req)
if err != nil {
return nil, err
}
// ensured no error
data, _ := json.Marshal(&v3Rsp.Logs)
var logs []json.RawMessage
_ = json.Unmarshal(data, &logs)
v2Rsp, err := toLogRespV2(v3Rsp, httpRsp.Header)
if err != nil {
return nil, err
}
lineRsp := GetLogLinesResponse{
GetLogsResponse: *v2Rsp,
Lines: logs,
}
return &lineRsp, nil
}
// GetLogs query logs with [from, to) time range
func (s *LogStore) GetLogs(topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
var req GetLogRequest
req.Topic = topic
req.From = from
req.To = to
req.Query = queryExp
req.Lines = maxLineNum
req.Offset = offset
req.Reverse = reverse
return s.GetLogsV2(&req)
}
func (s *LogStore) GetLogsByNano(topic string, fromInNS int64, toInNs int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
var req GetLogRequest
req.Topic = topic
req.From = fromInNS / 1e9
req.To = toInNs / 1e9
req.FromNsPart = int32(fromInNS % 1e9)
req.ToNsPart = int32(toInNs % 1e9)
req.Query = queryExp
req.Lines = maxLineNum
req.Offset = offset
req.Reverse = reverse
return s.GetLogsV2(&req)
}
func (s *LogStore) getToCompleted(f func() (bool, error)) {
interval := 100 * time.Millisecond
retryCount := MaxCompletedRetryCount
isCompleted := false
timeoutTime := time.Now().Add(MaxCompletedRetryLatency)
for retryCount > 0 && timeoutTime.After(time.Now()) {
var err error
isCompleted, err = f()
if err != nil || isCompleted {
return
}
time.Sleep(interval)
retryCount--
if interval < 10*time.Second {
interval = interval * 2
}
if interval > 10*time.Second {
interval = 10 * time.Second
}
}
return
}
// GetLogsToCompleted query logs with [from, to) time range to completed
func (s *LogStore) GetLogsToCompleted(topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
var res *GetLogsResponse
var err error
f := func() (bool, error) {
res, err = s.GetLogs(topic, from, to, queryExp, maxLineNum, offset, reverse)
if err == nil {
return res.IsComplete(), nil
}
return false, err
}
s.getToCompleted(f)
return res, err
}
// GetLogsToCompletedV2 query logs with [from, to) time range to completed
func (s *LogStore) GetLogsToCompletedV2(req *GetLogRequest) (*GetLogsResponse, error) {
var res *GetLogsResponse
var err error
f := func() (bool, error) {
res, err = s.GetLogsV2(req)
if err == nil {
return res.IsComplete(), nil
}
return false, err
}
s.getToCompleted(f)
return res, err
}
// GetLogsToCompletedV3 query logs with [from, to) time range to completed
func (s *LogStore) GetLogsToCompletedV3(req *GetLogRequest) (*GetLogsV3Response, error) {
var res *GetLogsV3Response
var err error
f := func() (bool, error) {
res, err = s.GetLogsV3(req)
if err == nil {
return res.IsComplete(), nil
}
return false, err
}
s.getToCompleted(f)
return res, err
}
// GetHistogramsToCompleted query logs with [from, to) time range to completed
func (s *LogStore) GetHistogramsToCompleted(topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {
var res *GetHistogramsResponse
var err error
f := func() (bool, error) {
res, err = s.GetHistograms(topic, from, to, queryExp)
if err == nil {
return res.IsComplete(), nil
}
return false, err
}
s.getToCompleted(f)
return res, err
}
func (s *LogStore) GetHistogramsToCompletedV2(ghr *GetHistogramRequest) (*GetHistogramsResponse, error) {
var res *GetHistogramsResponse
var err error
f := func() (bool, error) {
res, err = s.GetHistogramsV2(ghr)
if err == nil {
return res.IsComplete(), nil
}
return false, err
}
s.getToCompleted(f)
return res, err
}
// GetLogsV2 query logs with [from, to) time range
func (s *LogStore) GetLogsV2(req *GetLogRequest) (*GetLogsResponse, error) {
resp, httpRsp, err := s.getLogsV3Internal(req)
if err != nil {
return nil, err
}
return toLogRespV2(resp, httpRsp.Header)
}
func toLogRespV2(v3Resp *GetLogsV3Response, respHeader http.Header) (*GetLogsResponse, error) {
queryInfo, err := v3Resp.Meta.constructQueryInfo()
if err != nil {
return nil, fmt.Errorf("fail to construct x-log-query-info: %w", err)
}
convertToLogRespV2Header(v3Resp, respHeader, queryInfo)
return &GetLogsResponse{
Logs: v3Resp.Logs,
Progress: v3Resp.Meta.Progress,
Count: v3Resp.Meta.Count,
HasSQL: v3Resp.Meta.HasSQL,
Contents: queryInfo,
Header: respHeader,
}, nil
}
func convertToLogRespV2Header(v3Resp *GetLogsV3Response, header http.Header, queryInfo string) {
header.Add(GetLogsCountHeader, strconv.FormatInt(v3Resp.Meta.Count, 10))
header.Add(ProcessedRows, strconv.FormatInt(v3Resp.Meta.ProcessedRows, 10))
header.Add(ProgressHeader, v3Resp.Meta.Progress)
header.Add(ProcessedBytes, strconv.FormatInt(v3Resp.Meta.ProcessedBytes, 10))
header.Add(ElapsedMillisecond, strconv.FormatInt(v3Resp.Meta.ElapsedMillisecond, 10))
header.Add(HasSQLHeader, strconv.FormatBool(v3Resp.Meta.HasSQL))
header.Add(TelemetryType, v3Resp.Meta.TelemetryType)
header.Add(WhereQuery, v3Resp.Meta.WhereQuery)
header.Add(AggQuery, v3Resp.Meta.AggQuery)
header.Add(CpuSec, strconv.FormatFloat(v3Resp.Meta.CpuSec, 'E', -1, 64))
header.Add(CpuCores, strconv.FormatFloat(v3Resp.Meta.CpuCores, 'E', -1, 64))
header.Add(PowerSql, strconv.FormatBool(v3Resp.Meta.PowerSql))
header.Add(InsertedSql, v3Resp.Meta.InsertedSql)
header.Add(GetLogsQueryInfo, queryInfo)
}
// GetLogsV3 query logs with [from, to) time range
func (s *LogStore) GetLogsV3(req *GetLogRequest) (*GetLogsV3Response, error) {
result, _, err := s.getLogsV3Internal(req)
if err != nil {
return nil, err
}
return result, nil
}
func (s *LogStore) getLogsV3Internal(req *GetLogRequest) (*GetLogsV3Response, *http.Response, error) {
reqBody, err := json.Marshal(req)
if err != nil {
return nil, nil, err
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(reqBody)),
"Content-Type": "application/json",
"Accept-Encoding": "lz4",
}
uri := fmt.Sprintf("/logstores/%s/logs", s.Name)
r, err := request(s.project, "POST", uri, h, reqBody)
if err != nil {
return nil, nil, NewClientError(err)
}
defer r.Body.Close()
respBody, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, nil, readResponseError(err)
}
if r.StatusCode != http.StatusOK {
err := new(Error)
if jErr := json.Unmarshal(respBody, err); jErr != nil {
return nil, nil, NewBadResponseError(string(respBody), r.Header, r.StatusCode)
}
return nil, nil, err
}
if _, ok := r.Header[BodyRawSize]; ok {
if len(r.Header[BodyRawSize]) > 0 {
bodyRawSize, err := strconv.ParseInt(r.Header[BodyRawSize][0], 10, 64)
if err != nil {
return nil, nil, NewBadResponseError(string(respBody), r.Header, r.StatusCode)
}
out := make([]byte, bodyRawSize)
if bodyRawSize != 0 {
len, err := lz4.UncompressBlock(respBody, out)
if err != nil || int64(len) != bodyRawSize {
return nil, nil, NewBadResponseError(string(respBody), r.Header, r.StatusCode)
}
}
respBody = out
}
}
var result GetLogsV3Response
if err = json.Unmarshal(respBody, &result); err != nil {
return nil, nil, NewBadResponseError(string(respBody), r.Header, r.StatusCode)
}
return &result, r, nil
}
// GetContextLogs ...
func (s *LogStore) GetContextLogs(backLines int32, forwardLines int32,
packID string, packMeta string) (*GetContextLogsResponse, error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/json",
}
urlVal := url.Values{}
urlVal.Add("type", "context_log")
urlVal.Add("back_lines", strconv.Itoa(int(backLines)))
urlVal.Add("forward_lines", strconv.Itoa(int(forwardLines)))
urlVal.Add("pack_id", packID)
urlVal.Add("pack_meta", packMeta)
uri := fmt.Sprintf("/logstores/%s?%s", s.Name, urlVal.Encode())
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return nil, NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, readResponseError(err)
}
if r.StatusCode != http.StatusOK {
err := new(Error)
if jErr := json.Unmarshal(buf, err); jErr != nil {
return nil, NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
return nil, err
}
resp := GetContextLogsResponse{}
err = json.Unmarshal(buf, &resp)
if err != nil {
return nil, NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
return &resp, nil
}
// CreateIndex ...
func (s *LogStore) CreateIndex(index Index) error {
body, err := json.Marshal(index)
if err != nil {
return err
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
uri := fmt.Sprintf("/logstores/%s/index", s.Name)
r, err := request(s.project, "POST", uri, h, body)
if err != nil {
return err
}
r.Body.Close()
return nil
}
// CreateIndexString ...
func (s *LogStore) CreateIndexString(indexStr string) error {
body := []byte(indexStr)
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
uri := fmt.Sprintf("/logstores/%s/index", s.Name)
r, err := request(s.project, "POST", uri, h, body)
if err != nil {
return err
}
r.Body.Close()
return nil
}
// UpdateIndex ...
func (s *LogStore) UpdateIndex(index Index) error {
body, err := json.Marshal(index)
if err != nil {
return err
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
uri := fmt.Sprintf("/logstores/%s/index", s.Name)
r, err := request(s.project, "PUT", uri, h, body)
if r != nil {
r.Body.Close()
}
return err
}
// UpdateIndexString ...
func (s *LogStore) UpdateIndexString(indexStr string) error {
body := []byte(indexStr)
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
uri := fmt.Sprintf("/logstores/%s/index", s.Name)
r, err := request(s.project, "PUT", uri, h, body)
if r != nil {
r.Body.Close()
}
return err
}
// DeleteIndex ...
func (s *LogStore) DeleteIndex() error {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
uri := fmt.Sprintf("/logstores/%s/index", s.Name)
r, err := request(s.project, "DELETE", uri, h, nil)
if r != nil {
r.Body.Close()
}
return err
}
// GetIndex ...
func (s *LogStore) GetIndex() (*Index, error) {
h := map[string]string{
"Content-Type": "application/json",
"x-log-bodyrawsize": "0",
"Accept-Encoding": "deflate",
}
uri := fmt.Sprintf("/logstores/%s/index", s.Name)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return nil, err
}
index := &Index{}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, readResponseError(err)
}
err = json.Unmarshal(buf, index)
if err != nil {
return nil, NewBadResponseError(string(buf), r.Header, r.StatusCode)
}
return index, nil
}
// GetIndexString ...
func (s *LogStore) GetIndexString() (string, error) {
h := map[string]string{
"Content-Type": "application/json",
"x-log-bodyrawsize": "0",
"Accept-Encoding": "deflate",
}
uri := fmt.Sprintf("/logstores/%s/index", s.Name)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return "", err
}
defer r.Body.Close()
data, err := ioutil.ReadAll(r.Body)
return string(data), err
}
// CheckIndexExist check index exist or not
func (s *LogStore) CheckIndexExist() (bool, error) {
if _, err := s.GetIndex(); err != nil {
if slsErr, ok := err.(*Error); ok {
if slsErr.Code == "IndexConfigNotExist" {
return false, nil
}
return false, slsErr
}
return false, err
}
return true, nil
}
func (s *LogStore) GetMeteringMode() (*GetMeteringModeResponse, error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}
uri := fmt.Sprintf("/logstores/%s/meteringmode", s.Name)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return nil, err
}
defer r.Body.Close()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, NewBadResponseError("", r.Header, r.StatusCode)
}
res := GetMeteringModeResponse{}
err = json.Unmarshal(data, &res)
if err != nil {
return nil, NewBadResponseError(string(data), r.Header, r.StatusCode)
}
return &res, nil
}
func (s *LogStore) UpdateMeteringMode(meteringMode string) error {
body := map[string]string{
"meteringMode": meteringMode,
}
uri := fmt.Sprintf("/logstores/%s/meteringmode", s.Name)
requestBody, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("cant marshal body:%w", err)
}
h := map[string]string{
"Content-Type": "application/json",
"x-log-bodyrawsize": strconv.Itoa(len(requestBody)),
}
r, err := request(s.project, "PUT", uri, h, requestBody)
if err != nil {
return err
}
defer r.Body.Close()
return nil
}