func()

in runtime/core/protocol/grpc/consumer/consumer_manager.go [164:203]


func (c *consumerManager) clientCheck() {
	sessionExpiredInMills := config2.GlobalConfig().Server.GRPCOption.SessionExpiredInMills
	tk := time.NewTicker(sessionExpiredInMills)
	go func() {
		for range tk.C {
			var consumerGroupRestart []string
			c.consumerGroupClients.Range(func(key, value any) bool {
				localClients := value.(*set.Set)
				for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
					lc := iter.Value().(*GroupClient)
					if time.Since(lc.LastUPTime) > sessionExpiredInMills {
						log.Warnf("client:%v lastUpdate time:%v over three heartbeat cycles. Removing it",
							lc.ConsumerGroup, lc.LastUPTime)
						emconsumer, err := c.GetConsumer(lc.ConsumerGroup)
						if err != nil {
							log.Warnf("get eventmesh consumer:%v failed, err:%v", lc.ConsumerGroup, err)
							return true
						}
						if err := c.DeRegisterClient(lc); err != nil {
							log.Warnf("deregistry client:%v err:%v", lc.ConsumerGroup, err)
							return true
						}
						if ok := emconsumer.DeRegisterClient(lc); !ok {
							log.Warnf("failed deregistry client:%v in eventmesh consumer", lc.ConsumerGroup)
							return true
						}
						consumerGroupRestart = append(consumerGroupRestart, lc.ConsumerGroup)
					}
				}
				for _, rs := range consumerGroupRestart {
					if err := c.RestartConsumer(rs); err != nil {
						log.Warnf("deregistry consumer:%v  err:%v", rs, err)
						return true
					}
				}
				return true
			})
		}
	}()
}