consumer/consumer_client.go (163 lines of code) (raw):
package consumerLibrary
import (
"fmt"
"time"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
)
type ConsumerClient struct {
option LogHubConfig
client sls.ClientInterface
consumerGroup sls.ConsumerGroup
logger log.Logger
}
func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient {
// Setting configuration defaults
if option.HeartbeatIntervalInSecond == 0 {
option.HeartbeatIntervalInSecond = 20
}
if option.HeartbeatTimeoutInSecond == 0 {
option.HeartbeatTimeoutInSecond = option.HeartbeatIntervalInSecond * 3
}
if option.DataFetchIntervalInMs == 0 {
option.DataFetchIntervalInMs = 200
}
if option.MaxFetchLogGroupCount == 0 {
option.MaxFetchLogGroupCount = 1000
}
if option.AutoCommitIntervalInMS == 0 {
option.AutoCommitIntervalInMS = 60 * 1000
}
var client sls.ClientInterface
if option.CredentialsProvider != nil {
client = sls.CreateNormalInterfaceV2(option.Endpoint, option.CredentialsProvider)
} else {
client = sls.CreateNormalInterface(option.Endpoint,
option.AccessKeyID,
option.AccessKeySecret,
option.SecurityToken)
}
client.SetUserAgent(option.ConsumerGroupName + "_" + option.ConsumerName)
if option.HTTPClient != nil {
client.SetHTTPClient(option.HTTPClient)
}
if option.AuthVersion != "" {
client.SetAuthVersion(option.AuthVersion)
}
if option.Region != "" {
client.SetRegion(option.Region)
}
consumerGroup := sls.ConsumerGroup{
ConsumerGroupName: option.ConsumerGroupName,
Timeout: option.HeartbeatTimeoutInSecond,
InOrder: option.InOrder,
}
consumerClient := &ConsumerClient{
option,
client,
consumerGroup,
logger,
}
return consumerClient
}
func (consumer *ConsumerClient) createConsumerGroup() error {
consumerGroups, err := consumer.client.ListConsumerGroup(consumer.option.Project, consumer.option.Logstore)
if err != nil {
return fmt.Errorf("list consumer group failed: %w", err)
}
alreadyExist := false
for _, cg := range consumerGroups {
if cg.ConsumerGroupName == consumer.consumerGroup.ConsumerGroupName {
alreadyExist = true
if (*cg) != consumer.consumerGroup {
level.Info(consumer.logger).Log("msg", "this config is different from original config, try to override it", "old_config", cg)
} else {
level.Info(consumer.logger).Log("msg", "new consumer join the consumer group", "consumer name", consumer.option.ConsumerName,
"group name", consumer.option.ConsumerGroupName)
return nil
}
}
}
if alreadyExist {
if err := consumer.client.UpdateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil {
return fmt.Errorf("update consumer group failed: %w", err)
}
} else {
if err := consumer.client.CreateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil {
if slsError, ok := err.(*sls.Error); !ok || slsError.Code != "ConsumerGroupAlreadyExist" {
return fmt.Errorf("create consumer group failed: %w", err)
}
}
}
return nil
}
func (consumer *ConsumerClient) heartBeat(heart []int) ([]int, error) {
heldShard, err := consumer.client.HeartBeat(consumer.option.Project, consumer.option.Logstore, consumer.option.ConsumerGroupName, consumer.option.ConsumerName, heart)
return heldShard, err
}
func (consumer *ConsumerClient) updateCheckPoint(shardId int, checkpoint string, forceSucess bool) error {
return consumer.client.UpdateCheckpoint(consumer.option.Project, consumer.option.Logstore, consumer.option.ConsumerGroupName, consumer.option.ConsumerName, shardId, checkpoint, forceSucess)
}
// get a single shard checkpoint, if not,return ""
func (consumer *ConsumerClient) getCheckPoint(shardId int) (checkpoint string, err error) {
checkPonitList := []*sls.ConsumerGroupCheckPoint{}
for retry := 0; retry < 3; retry++ {
checkPonitList, err = consumer.client.GetCheckpoint(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup.ConsumerGroupName)
if err != nil {
level.Info(consumer.logger).Log("msg", "shard Get checkpoint gets errors, starts to try again", "shard", shardId, "error", err)
time.Sleep(1 * time.Second)
} else {
break
}
}
if err != nil {
return "", err
}
for _, checkPoint := range checkPonitList {
if checkPoint.ShardID == shardId {
return checkPoint.CheckPoint, nil
}
}
return "", err
}
func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, error) {
cursor, err := consumer.client.GetCursor(consumer.option.Project, consumer.option.Logstore, shardId, from)
return cursor, err
}
func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, plm *sls.PullLogMeta, err error) {
plr := &sls.PullLogRequest{
Project: consumer.option.Project,
Logstore: consumer.option.Logstore,
ShardID: shardId,
Cursor: cursor,
Query: consumer.option.Query,
LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount,
CompressType: consumer.option.CompressType,
}
for retry := 0; retry < 3; retry++ {
gl, plm, err = consumer.client.PullLogsWithQuery(plr)
if err != nil {
slsError, ok := err.(*sls.Error)
if ok {
level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error",
"shard", shardId,
"error", slsError,
"tryTimes", retry+1,
"cursor", cursor,
)
if slsError.HTTPCode == 403 {
time.Sleep(5 * time.Second)
}
} else {
level.Warn(consumer.logger).Log("msg", "unknown error when pull log",
"shardId", shardId,
"cursor", cursor,
"error", err,
"tryTimes", retry+1)
}
time.Sleep(200 * time.Millisecond)
} else {
return gl, plm, nil
}
}
// If you can't retry the log three times, it will return to empty list and start pulling the log cursor,
// so that next time you will come in and pull the function again, which is equivalent to a dead cycle.
return
}