in consumer/checkpoint_tracker.go [85:120]
func (tracker *DefaultCheckPointTracker) flushCheckPoint() error {
if tracker.pendingCheckPoint == "" || tracker.pendingCheckPoint == tracker.savedCheckPoint {
return nil
}
for i := 0; ; i++ {
err := tracker.client.updateCheckPoint(tracker.shardId, tracker.pendingCheckPoint, true)
if err == nil {
break
}
slsErr, ok := err.(*sls.Error)
if ok {
if strings.EqualFold(slsErr.Code, "ConsumerNotExsit") || strings.EqualFold(slsErr.Code, "ConsumerNotMatch") {
tracker.heartBeat.removeHeartShard(tracker.shardId)
level.Warn(tracker.logger).Log("msg", "consumer has been removed or shard has been reassigned", "shard", tracker.shardId, "err", slsErr)
break
} else if strings.EqualFold(slsErr.Code, "ShardNotExsit") {
tracker.heartBeat.removeHeartShard(tracker.shardId)
level.Warn(tracker.logger).Log("msg", "shard does not exist", "shard", tracker.shardId)
break
}
}
if i >= 2 {
level.Error(tracker.logger).Log(
"msg", "failed to save checkpoint",
"consumer", tracker.client.option.ConsumerName,
"shard", tracker.shardId,
"checkpoint", tracker.pendingCheckPoint,
)
return err
}
time.Sleep(100 * time.Millisecond)
}
tracker.savedCheckPoint = tracker.pendingCheckPoint
return nil
}