client_store.go (478 lines of code) (raw):
package sls
import (
base64E "encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"time"
"github.com/go-kit/kit/log/level"
)
func convertLogstore(c *Client, project, logstore string) *LogStore {
c.accessKeyLock.RLock()
proj := convertLocked(c, project)
c.accessKeyLock.RUnlock()
return &LogStore{
project: proj,
Name: logstore,
}
}
// ListShards returns shard id list of this logstore.
func (c *Client) ListShards(project, logstore string) (shardIDs []*Shard, err error) {
ls := convertLogstore(c, project, logstore)
return ls.ListShards()
}
// SplitShard https://help.aliyun.com/document_detail/29021.html
func (c *Client) SplitShard(project, logstore string, shardID int, splitKey string) (shards []*Shard, err error) {
return c.splitShard(project, logstore, shardID, 0, splitKey)
}
// SplitNumShard https://help.aliyun.com/document_detail/29021.html
func (c *Client) SplitNumShard(project, logstore string, shardID, shardsNum int) (shards []*Shard, err error) {
return c.splitShard(project, logstore, shardID, shardsNum, "")
}
func (c *Client) splitShard(project, logstore string, shardID, shardsNum int, splitKey string) (shards []*Shard, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
}
urlVal := url.Values{}
urlVal.Add("action", "split")
if splitKey != "" {
urlVal.Add("key", splitKey)
}
if shardsNum > 0 {
urlVal.Add("shardCount", strconv.Itoa(shardsNum))
}
uri := fmt.Sprintf("/logstores/%v/shards/%v?%v", logstore, shardID, urlVal.Encode())
r, err := c.request(project, "POST", uri, h, nil)
if err != nil {
return
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, NewClientError(err)
}
err = json.Unmarshal(buf, &shards)
return
}
// MergeShards https://help.aliyun.com/document_detail/29022.html
func (c *Client) MergeShards(project, logstore string, shardID int) (shards []*Shard, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
}
urlVal := url.Values{}
urlVal.Add("action", "merge")
uri := fmt.Sprintf("/logstores/%v/shards/%v?%v", logstore, shardID, urlVal.Encode())
r, err := c.request(project, "POST", uri, h, nil)
if err != nil {
return
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, NewClientError(err)
}
err = json.Unmarshal(buf, &shards)
return
}
// PutLogs put logs into logstore.
// The callers should transform user logs into LogGroup.
func (c *Client) PutLogs(project, logstore string, lg *LogGroup) (err error) {
ls := convertLogstore(c, project, logstore)
return ls.PutLogs(lg)
}
// PostLogStoreLogs put logs into Shard logstore by hashKey.
// The callers should transform user logs into LogGroup.
func (c *Client) PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKey *string) (err error) {
ls := convertLogstore(c, project, logstore)
req := &PostLogStoreLogsRequest{
LogGroup: lg,
HashKey: hashKey,
}
return ls.PostLogStoreLogs(req)
}
func (c *Client) PutLogsWithMetricStoreURL(project, logstore string, lg *LogGroup) (err error) {
ls := convertLogstore(c, project, logstore)
ls.useMetricStoreURL = true
return ls.PutLogs(lg)
}
func (c *Client) PostLogStoreLogsV2(project, logstore string, req *PostLogStoreLogsRequest) (err error) {
ls := convertLogstore(c, project, logstore)
return ls.PostLogStoreLogs(req)
}
// PostRawLogWithCompressType put raw log data to log service, no marshal
func (c *Client) PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error) {
ls := convertLogstore(c, project, logstore)
if err := ls.SetPutLogCompressType(compressType); err != nil {
return err
}
return ls.PostRawLogs(rawLogData, hashKey)
}
// PutLogsWithCompressType put logs into logstore with specific compress type.
// The callers should transform user logs into LogGroup.
func (c *Client) PutLogsWithCompressType(project, logstore string, lg *LogGroup, compressType int) (err error) {
ls := convertLogstore(c, project, logstore)
if err := ls.SetPutLogCompressType(compressType); err != nil {
return err
}
return ls.PutLogs(lg)
}
// PutRawLogWithCompressType put raw log data to log service, no marshal
func (c *Client) PutRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int) (err error) {
ls := convertLogstore(c, project, logstore)
if err := ls.SetPutLogCompressType(compressType); err != nil {
return err
}
return ls.PutRawLog(rawLogData)
}
// GetCursor gets log cursor of one shard specified by shardId.
// The from can be in three form: a) unix timestamp in seccond, b) "begin", c) "end".
// For more detail please read: https://help.aliyun.com/document_detail/29024.html
func (c *Client) GetCursor(project, logstore string, shardID int, from string) (cursor string, err error) {
ls := convertLogstore(c, project, logstore)
return ls.GetCursor(shardID, from)
}
// GetCursorTime ...
func (c *Client) GetCursorTime(project, logstore string, shardID int, cursor string) (cursorTime time.Time, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
}
urlVal := url.Values{}
urlVal.Add("cursor", cursor)
urlVal.Add("type", "cursor_time")
uri := fmt.Sprintf("/logstores/%v/shards/%v?%v", logstore, shardID, urlVal.Encode())
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return cursorTime, NewClientError(err)
}
type getCursorResult struct {
CursorTime int `json:"cursor_time"`
}
var rst getCursorResult
err = json.Unmarshal(buf, &rst)
return time.Unix(int64(rst.CursorTime), 0), err
}
// GetPrevCursorTime ...
func (c *Client) GetPrevCursorTime(project, logstore string, shardID int, cursor string) (cursorTime time.Time, err error) {
realCursor, err := base64E.StdEncoding.DecodeString(cursor)
if err != nil {
return cursorTime, NewClientError(err)
}
cursorVal, err := strconv.Atoi(string(realCursor))
if err != nil {
return cursorTime, NewClientError(err)
}
cursorVal--
nextCursor := base64E.StdEncoding.EncodeToString([]byte(strconv.Itoa(cursorVal)))
return c.GetCursorTime(project, logstore, shardID, nextCursor)
}
// GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (c *Client) GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
plr := &PullLogRequest{
Project: project,
Logstore: logstore,
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.GetLogsBytesV2(plr)
}
func (c *Client) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.GetLogsBytesV2(plr)
}
func (c *Client) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, plm *PullLogMeta, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.GetLogsBytesWithQuery(plr)
}
// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next cursor can be used to read logs at next time.
// @note if you want to pull logs continuous, set endCursor = ""
func (c *Client) PullLogs(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
ls := convertLogstore(c, project, logstore)
return ls.PullLogs(shardID, cursor, endCursor, logGroupMaxCount)
}
func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.PullLogsV2(plr)
}
func (c *Client) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.PullLogsWithQuery(plr)
}
// GetHistograms query logs with [from, to) time range
func (c *Client) GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetHistograms(topic, from, to, queryExp)
}
func (c *Client) GetHistogramsV2(project, logstore string, ghr *GetHistogramRequest) (*GetHistogramsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetHistogramsV2(ghr)
}
// GetHistogramsToCompleted query logs with [from, to) time range to completed
func (c *Client) GetHistogramsToCompleted(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetHistogramsToCompleted(topic, from, to, queryExp)
}
func (c *Client) GetHistogramsToCompletedV2(project, logstore string, ghr *GetHistogramRequest) (*GetHistogramsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetHistogramsToCompletedV2(ghr)
}
// GetLogs query logs with [from, to) time range
func (c *Client) GetLogs(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogs(topic, from, to, queryExp, maxLineNum, offset, reverse)
}
func (c *Client) GetLogsByNano(project, logstore string, topic string, fromInNs int64, toInNs int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsByNano(topic, fromInNs, toInNs, queryExp, maxLineNum, offset, reverse)
}
// GetLogsToCompleted query logs with [from, to) time range to completed
func (c *Client) GetLogsToCompleted(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsToCompleted(topic, from, to, queryExp, maxLineNum, offset, reverse)
}
// GetLogLines ...
func (c *Client) GetLogLines(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogLinesResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogLines(topic, from, to, queryExp, maxLineNum, offset, reverse)
}
func (c *Client) GetLogLinesByNano(project, logstore string, topic string, fromInNs int64, toInNs int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogLinesResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogLinesByNano(topic, fromInNs, toInNs, queryExp, maxLineNum, offset, reverse)
}
// GetLogsV2 ...
func (c *Client) GetLogsV2(project, logstore string, req *GetLogRequest) (*GetLogsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsV2(req)
}
// GetLogsV3 ...
func (c *Client) GetLogsV3(project, logstore string, req *GetLogRequest) (*GetLogsV3Response, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsV3(req)
}
// GetLogsToCompletedV2 ...
func (c *Client) GetLogsToCompletedV2(project, logstore string, req *GetLogRequest) (*GetLogsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsToCompletedV2(req)
}
// GetLogsToCompletedV3 ...
func (c *Client) GetLogsToCompletedV3(project, logstore string, req *GetLogRequest) (*GetLogsV3Response, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsToCompletedV3(req)
}
// GetLogLinesV2 ...
func (c *Client) GetLogLinesV2(project, logstore string, req *GetLogRequest) (*GetLogLinesResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogLinesV2(req)
}
// CreateIndex ...
func (c *Client) CreateIndex(project, logstore string, index Index) error {
ls := convertLogstore(c, project, logstore)
return ls.CreateIndex(index)
}
// UpdateIndex ...
func (c *Client) UpdateIndex(project, logstore string, index Index) error {
ls := convertLogstore(c, project, logstore)
return ls.UpdateIndex(index)
}
// GetIndex ...
func (c *Client) GetIndex(project, logstore string) (*Index, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetIndex()
}
// CreateIndexString ...
func (c *Client) CreateIndexString(project, logstore string, index string) error {
ls := convertLogstore(c, project, logstore)
return ls.CreateIndexString(index)
}
// UpdateIndexString ...
func (c *Client) UpdateIndexString(project, logstore string, index string) error {
ls := convertLogstore(c, project, logstore)
return ls.UpdateIndexString(index)
}
// GetIndexString ...
func (c *Client) GetIndexString(project, logstore string) (string, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetIndexString()
}
// DeleteIndex ...
func (c *Client) DeleteIndex(project, logstore string) error {
ls := convertLogstore(c, project, logstore)
return ls.DeleteIndex()
}
// ListSubStore ...
func (c *Client) ListSubStore(project, logstore string) (sortedSubStores []string, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
}
uri := fmt.Sprintf("/logstores/%v/substores", logstore)
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &Error{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to remove config from machine group")
if IsDebugLevelMatched(1) {
dump, _ := httputil.DumpResponse(r, true)
level.Error(Logger).Log("msg", string(dump))
}
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
type sortedSubStoreList struct {
SubStores []string `json:"substores"`
}
body := &sortedSubStoreList{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}
sortedSubStores = body.SubStores
return
}
// GetSubStore ...
func (c *Client) GetSubStore(project, logstore, name string) (sortedSubStore *SubStore, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
}
uri := fmt.Sprintf("/logstores/%s/substores/%s", logstore, name)
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &Error{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to remove config from machine group")
if IsDebugLevelMatched(1) {
dump, _ := httputil.DumpResponse(r, true)
level.Error(Logger).Log("msg", string(dump))
}
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
sortedSubStore = &SubStore{}
err = json.Unmarshal(buf, sortedSubStore)
if err != nil {
sortedSubStore = nil
return
}
return
}
// CreateSubStore ...
func (c *Client) CreateSubStore(project, logstore string, sss *SubStore) (err error) {
body, err := json.Marshal(sss)
if err != nil {
return NewClientError(err)
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate",
}
r, err := c.request(project, "POST", fmt.Sprintf("/logstores/%s/substores", logstore), h, body)
if err != nil {
return NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return readResponseError(err)
}
if r.StatusCode != http.StatusOK {
return httpStatusNotOkError(buf, r.Header, r.StatusCode)
}
return
}
// UpdateSubStore ...
func (c *Client) UpdateSubStore(project, logstore string, sss *SubStore) (err error) {
body, err := json.Marshal(sss)
if err != nil {
return NewClientError(err)
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate",
}
r, err := c.request(project, "PUT", fmt.Sprintf("/logstores/%s/substores/%s", logstore, sss.Name), h, body)
if err != nil {
return NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return readResponseError(err)
}
if r.StatusCode != http.StatusOK {
return httpStatusNotOkError(buf, r.Header, r.StatusCode)
}
return
}
// DeleteSubStore ...
func (c *Client) DeleteSubStore(project, logstore string, name string) (err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
}
r, err := c.request(project, "DELETE", fmt.Sprintf("/logstores/%s/substores/%s", logstore, name), h, nil)
if err != nil {
return NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return readResponseError(err)
}
if r.StatusCode != http.StatusOK {
return httpStatusNotOkError(buf, r.Header, r.StatusCode)
}
return
}
// GetSubStoreTTL ...
func (c *Client) GetSubStoreTTL(project, logstore string) (ttl int, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
}
uri := fmt.Sprintf("/logstores/%s/substores/storage/ttl", logstore)
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &Error{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to remove config from machine group")
if IsDebugLevelMatched(1) {
dump, _ := httputil.DumpResponse(r, true)
level.Error(Logger).Log("msg", string(dump))
}
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
type ttlDef struct {
TTL int `json:"ttl"`
}
var ttlIns ttlDef
err = json.Unmarshal(buf, &ttlIns)
if err != nil {
return
}
return ttlIns.TTL, err
}
// UpdateSubStoreTTL ...
func (c *Client) UpdateSubStoreTTL(project, logstore string, ttl int) (err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
}
r, err := c.request(project, "PUT", fmt.Sprintf("/logstores/%s/substores/storage/ttl?ttl=%d", logstore, ttl), h, nil)
if err != nil {
return NewClientError(err)
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return readResponseError(err)
}
if r.StatusCode != http.StatusOK {
return httpStatusNotOkError(buf, r.Header, r.StatusCode)
}
return
}