consumer/heart_beat.go (105 lines of code) (raw):

package consumerLibrary import ( "fmt" "sync" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "go.uber.org/atomic" ) type ConsumerHeartBeat struct { client *ConsumerClient shutDownFlag *atomic.Bool heldShards []int heartShards []int logger log.Logger lastHeartBeatSuccessTime int64 shardLock sync.RWMutex } func initConsumerHeatBeat(consumerClient *ConsumerClient, logger log.Logger) *ConsumerHeartBeat { consumerHeartBeat := &ConsumerHeartBeat{ client: consumerClient, shutDownFlag: atomic.NewBool(false), heldShards: []int{}, heartShards: []int{}, logger: logger, lastHeartBeatSuccessTime: time.Now().Unix(), } return consumerHeartBeat } func (heartbeat *ConsumerHeartBeat) getHeldShards() []int { heartbeat.shardLock.RLock() defer heartbeat.shardLock.RUnlock() return append([]int{}, heartbeat.heldShards...) } func (heartbeat *ConsumerHeartBeat) setHeldShards(heldShards []int) { heartbeat.shardLock.Lock() defer heartbeat.shardLock.Unlock() heartbeat.heldShards = heldShards } func (heartbeat *ConsumerHeartBeat) getHeartShards() []int { heartbeat.shardLock.RLock() defer heartbeat.shardLock.RUnlock() return append([]int{}, heartbeat.heartShards...) } func (heartbeat *ConsumerHeartBeat) shutDownHeart() { level.Info(heartbeat.logger).Log("msg", "try to stop heart beat") heartbeat.shutDownFlag.Store(true) } func (heartbeat *ConsumerHeartBeat) updateHeartShard() { m := make(map[int]bool) heartbeat.shardLock.Lock() defer heartbeat.shardLock.Unlock() for _, shard := range heartbeat.heartShards { m[shard] = true } for _, shard := range heartbeat.heldShards { m[shard] = true } uploadShards := make([]int, 0, len(m)) for shard := range m { uploadShards = append(uploadShards, shard) } heartbeat.heartShards = uploadShards } func (heartbeat *ConsumerHeartBeat) heartBeatRun() { var lastHeartBeatTime int64 for !heartbeat.shutDownFlag.Load() { lastHeartBeatTime = time.Now().Unix() heartbeat.updateHeartShard() responseShards, err := heartbeat.client.heartBeat(heartbeat.getHeartShards()) if err != nil { level.Warn(heartbeat.logger).Log("msg", "send heartbeat error", "error", err) if time.Now().Unix()-heartbeat.lastHeartBeatSuccessTime > int64(heartbeat.client.consumerGroup.Timeout+heartbeat.client.option.HeartbeatIntervalInSecond) { heartbeat.setHeldShards([]int{}) level.Info(heartbeat.logger).Log("msg", "Heart beat timeout, automatic reset consumer held shards") } } else { heartbeat.lastHeartBeatSuccessTime = time.Now().Unix() level.Info(heartbeat.logger).Log("heart beat result", fmt.Sprintf("%v", heartbeat.heartShards), "get", fmt.Sprintf("%v", responseShards)) heartbeat.setHeldShards(responseShards) if !IntSliceReflectEqual(heartbeat.getHeartShards(), heartbeat.getHeldShards()) { currentSet := Set(heartbeat.getHeartShards()) responseSet := Set(heartbeat.getHeldShards()) add := Subtract(currentSet, responseSet) remove := Subtract(responseSet, currentSet) level.Info(heartbeat.logger).Log("shard reorganize, adding:", fmt.Sprintf("%v", add), "removing:", fmt.Sprintf("%v", remove)) } } TimeToSleepInSecond(int64(heartbeat.client.option.HeartbeatIntervalInSecond), lastHeartBeatTime, heartbeat.shutDownFlag.Load()) } level.Info(heartbeat.logger).Log("msg", "heart beat exit") } func (heartbeat *ConsumerHeartBeat) removeHeartShard(shardId int) bool { heartbeat.shardLock.Lock() defer heartbeat.shardLock.Unlock() isDeleteShard := false for i, heartShard := range heartbeat.heartShards { if shardId == heartShard { heartbeat.heartShards = append(heartbeat.heartShards[:i], heartbeat.heartShards[i+1:]...) isDeleteShard = true break } } return isDeleteShard }