client_consumer.go (193 lines of code) (raw):

package sls import ( "encoding/json" "fmt" "io/ioutil" "net/http" "net/http/httputil" "net/url" "github.com/go-kit/kit/log/level" ) // ConsumerGroup type define type ConsumerGroup struct { ConsumerGroupName string `json:"consumerGroup"` Timeout int `json:"timeout"` // timeout seconds InOrder bool `json:"order"` } func (cg *ConsumerGroup) String() string { return fmt.Sprintf("[ConsumerGroupName: %s, Timeout: %d, InOrder: %t]", cg.ConsumerGroupName, cg.Timeout, cg.InOrder) } // ConsumerGroupCheckPoint type define type ConsumerGroupCheckPoint struct { ShardID int `json:"shard"` CheckPoint string `json:"checkpoint"` UpdateTime int64 `json:"updateTime"` Consumer string `json:"consumer"` } // CreateConsumerGroup ... func (c *Client) CreateConsumerGroup(project, logstore string, cg ConsumerGroup) (err error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } body, err := json.Marshal(cg) if err != nil { return err } uri := fmt.Sprintf("/logstores/%v/consumergroups", logstore) _, err = c.request(project, "POST", uri, h, body) if err != nil { return NewClientError(err) } return nil } // UpdateConsumerGroup ... func (c *Client) UpdateConsumerGroup(project, logstore string, cg ConsumerGroup) (err error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } updates := make(map[string]interface{}) updates["order"] = cg.InOrder if cg.Timeout > 0 { updates["timeout"] = cg.Timeout } body, err := json.Marshal(updates) if err != nil { return err } uri := fmt.Sprintf("/logstores/%v/consumergroups/%v", logstore, cg.ConsumerGroupName) _, err = c.request(project, "PUT", uri, h, body) if err != nil { return NewClientError(err) } return nil } // DeleteConsumerGroup ... func (c *Client) DeleteConsumerGroup(project, logstore string, cgName string) (err error) { h := map[string]string{ "x-log-bodyrawsize": "0", } uri := fmt.Sprintf("/logstores/%v/consumergroups/%v", logstore, cgName) _, err = c.request(project, "DELETE", uri, h, nil) if err != nil { return NewClientError(err) } return nil } // ListConsumerGroup ... func (c *Client) ListConsumerGroup(project, logstore string) (cgList []*ConsumerGroup, err error) { h := map[string]string{ "x-log-bodyrawsize": "0", } uri := fmt.Sprintf("/logstores/%v/consumergroups", logstore) r, err := c.request(project, "GET", uri, h, nil) if err != nil { return nil, NewClientError(err) } defer r.Body.Close() buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, NewClientError(err) } if r.StatusCode != http.StatusOK { errMsg := &Error{} err = json.Unmarshal(buf, errMsg) if err != nil { err = fmt.Errorf("failed to split shards") if IsDebugLevelMatched(5) { dump, _ := httputil.DumpResponse(r, true) level.Error(Logger).Log("msg", string(dump)) } return nil, NewClientError(err) } return nil, errMsg } type getConsumerGroup struct { ConsumerGroupName string `json:"name"` // for getConsumerGroup, this is "name" Timeout int `json:"timeout"` // timeout seconds InOrder bool `json:"order"` } var cgListRaw []*getConsumerGroup err = json.Unmarshal(buf, &cgListRaw) for _, rawCg := range cgListRaw { cgList = append(cgList, &ConsumerGroup{ ConsumerGroupName: rawCg.ConsumerGroupName, Timeout: rawCg.Timeout, InOrder: rawCg.InOrder, }) } return } // HeartBeat ... func (c *Client) HeartBeat(project, logstore string, cgName, consumer string, heartBeatShardIDs []int) (shardIDs []int, err error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } if heartBeatShardIDs == nil { heartBeatShardIDs = []int{} } body, err := json.Marshal(heartBeatShardIDs) if err != nil { return nil, NewClientError(err) } urlVal := url.Values{} urlVal.Add("type", "heartbeat") urlVal.Add("consumer", consumer) uri := fmt.Sprintf("/logstores/%v/consumergroups/%v?%v", logstore, cgName, urlVal.Encode()) r, err := c.request(project, "POST", uri, h, body) if err != nil { return nil, NewClientError(err) } defer r.Body.Close() buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, readResponseError(err) } var shards []int err = json.Unmarshal(buf, &shards) if err != nil { return nil, invalidJsonRespError(string(buf), r.Header, r.StatusCode) } shardIDs = append(shardIDs, shards...) return shardIDs, nil } // UpdateCheckpoint ... func (c *Client) UpdateCheckpoint(project, logstore string, cgName string, consumer string, shardID int, checkpoint string, forceSuccess bool) (err error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } b := map[string]interface{}{ "shard": shardID, "checkpoint": checkpoint, } body, err := json.Marshal(b) if err != nil { return NewClientError(err) } urlVal := url.Values{} urlVal.Add("type", "checkpoint") urlVal.Add("consumer", consumer) if forceSuccess { urlVal.Add("forceSuccess", "true") } else { urlVal.Add("forceSuccess", "false") } // fmt.Println(urlVal.Encode()) uri := fmt.Sprintf("/logstores/%v/consumergroups/%v?%v", logstore, cgName, urlVal.Encode()) _, err = c.request(project, "POST", uri, h, body) if err != nil { return NewClientError(err) } return nil } // GetCheckpoint ... func (c *Client) GetCheckpoint(project, logstore string, cgName string) (checkPointList []*ConsumerGroupCheckPoint, err error) { h := map[string]string{ "x-log-bodyrawsize": "0", } uri := fmt.Sprintf("/logstores/%v/consumergroups/%v", logstore, cgName) r, err := c.request(project, "GET", uri, h, nil) if err != nil { return nil, NewClientError(err) } defer r.Body.Close() buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, readResponseError(err) } err = json.Unmarshal(buf, &checkPointList) return }