func()

in consumer/checkpoint_tracker.go [85:120]


func (tracker *DefaultCheckPointTracker) flushCheckPoint() error {
	if tracker.pendingCheckPoint == "" || tracker.pendingCheckPoint == tracker.savedCheckPoint {
		return nil
	}
	for i := 0; ; i++ {
		err := tracker.client.updateCheckPoint(tracker.shardId, tracker.pendingCheckPoint, true)
		if err == nil {
			break
		}
		slsErr, ok := err.(*sls.Error)
		if ok {
			if strings.EqualFold(slsErr.Code, "ConsumerNotExsit") || strings.EqualFold(slsErr.Code, "ConsumerNotMatch") {
				tracker.heartBeat.removeHeartShard(tracker.shardId)
				level.Warn(tracker.logger).Log("msg", "consumer has been removed or shard has been reassigned", "shard", tracker.shardId, "err", slsErr)
				break
			} else if strings.EqualFold(slsErr.Code, "ShardNotExsit") {
				tracker.heartBeat.removeHeartShard(tracker.shardId)
				level.Warn(tracker.logger).Log("msg", "shard does not exist", "shard", tracker.shardId)
				break
			}
		}
		if i >= 2 {
			level.Error(tracker.logger).Log(
				"msg", "failed to save checkpoint",
				"consumer", tracker.client.option.ConsumerName,
				"shard", tracker.shardId,
				"checkpoint", tracker.pendingCheckPoint,
			)
			return err
		}
		time.Sleep(100 * time.Millisecond)
	}

	tracker.savedCheckPoint = tracker.pendingCheckPoint
	return nil
}