consumer/checkpoint_tracker.go (97 lines of code) (raw):
package consumerLibrary
import (
"strings"
"time"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
)
// CheckPointTracker
// Generally, you just need SaveCheckPoint, if you use more funcs, make sure you understand these
type CheckPointTracker interface {
// GetCheckPoint get lastest saved check point
GetCheckPoint() string
// SaveCheckPoint, save next cursor to checkpoint
SaveCheckPoint(force bool) error
// GetCurrentCursor get current fetched data cursor
GetCurrentCursor() string
// GetNextCursor get next fetched data cursor(this is also the next checkpoint to be saved)
GetNextCursor() string
// GetShardId, return the id of shard tracked
GetShardId() int
}
type DefaultCheckPointTracker struct {
client *ConsumerClient
heartBeat *ConsumerHeartBeat
nextCursor string // cursor for already pulled data
currentCursor string // cursor for data processed, but may not be saved to server
pendingCheckPoint string // pending cursor to saved
savedCheckPoint string // already saved
shardId int
logger log.Logger
}
func initConsumerCheckpointTracker(shardId int, consumerClient *ConsumerClient, consumerHeatBeat *ConsumerHeartBeat, logger log.Logger) *DefaultCheckPointTracker {
checkpointTracker := &DefaultCheckPointTracker{
client: consumerClient,
heartBeat: consumerHeatBeat,
shardId: shardId,
logger: logger,
}
return checkpointTracker
}
func (tracker *DefaultCheckPointTracker) initCheckPoint(cursor string) {
tracker.savedCheckPoint = cursor
}
func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error {
tracker.pendingCheckPoint = tracker.nextCursor
if force {
return tracker.flushCheckPoint()
}
return nil
}
func (tracker *DefaultCheckPointTracker) GetCheckPoint() string {
return tracker.savedCheckPoint
}
func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string {
return tracker.currentCursor
}
func (tracker *DefaultCheckPointTracker) setCurrentCursor(cursor string) {
tracker.currentCursor = cursor
}
func (tracker *DefaultCheckPointTracker) GetNextCursor() string {
return tracker.nextCursor
}
func (tracker *DefaultCheckPointTracker) setNextCursor(cursor string) {
tracker.nextCursor = cursor
}
func (tracker *DefaultCheckPointTracker) GetShardId() int {
return tracker.shardId
}
func (tracker *DefaultCheckPointTracker) flushCheckPoint() error {
if tracker.pendingCheckPoint == "" || tracker.pendingCheckPoint == tracker.savedCheckPoint {
return nil
}
for i := 0; ; i++ {
err := tracker.client.updateCheckPoint(tracker.shardId, tracker.pendingCheckPoint, true)
if err == nil {
break
}
slsErr, ok := err.(*sls.Error)
if ok {
if strings.EqualFold(slsErr.Code, "ConsumerNotExsit") || strings.EqualFold(slsErr.Code, "ConsumerNotMatch") {
tracker.heartBeat.removeHeartShard(tracker.shardId)
level.Warn(tracker.logger).Log("msg", "consumer has been removed or shard has been reassigned", "shard", tracker.shardId, "err", slsErr)
break
} else if strings.EqualFold(slsErr.Code, "ShardNotExsit") {
tracker.heartBeat.removeHeartShard(tracker.shardId)
level.Warn(tracker.logger).Log("msg", "shard does not exist", "shard", tracker.shardId)
break
}
}
if i >= 2 {
level.Error(tracker.logger).Log(
"msg", "failed to save checkpoint",
"consumer", tracker.client.option.ConsumerName,
"shard", tracker.shardId,
"checkpoint", tracker.pendingCheckPoint,
)
return err
}
time.Sleep(100 * time.Millisecond)
}
tracker.savedCheckPoint = tracker.pendingCheckPoint
return nil
}