in runtime/core/protocol/grpc/consumer/consumer_manager.go [164:203]
func (c *consumerManager) clientCheck() {
sessionExpiredInMills := config2.GlobalConfig().Server.GRPCOption.SessionExpiredInMills
tk := time.NewTicker(sessionExpiredInMills)
go func() {
for range tk.C {
var consumerGroupRestart []string
c.consumerGroupClients.Range(func(key, value any) bool {
localClients := value.(*set.Set)
for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
lc := iter.Value().(*GroupClient)
if time.Since(lc.LastUPTime) > sessionExpiredInMills {
log.Warnf("client:%v lastUpdate time:%v over three heartbeat cycles. Removing it",
lc.ConsumerGroup, lc.LastUPTime)
emconsumer, err := c.GetConsumer(lc.ConsumerGroup)
if err != nil {
log.Warnf("get eventmesh consumer:%v failed, err:%v", lc.ConsumerGroup, err)
return true
}
if err := c.DeRegisterClient(lc); err != nil {
log.Warnf("deregistry client:%v err:%v", lc.ConsumerGroup, err)
return true
}
if ok := emconsumer.DeRegisterClient(lc); !ok {
log.Warnf("failed deregistry client:%v in eventmesh consumer", lc.ConsumerGroup)
return true
}
consumerGroupRestart = append(consumerGroupRestart, lc.ConsumerGroup)
}
}
for _, rs := range consumerGroupRestart {
if err := c.RestartConsumer(rs); err != nil {
log.Warnf("deregistry consumer:%v err:%v", rs, err)
return true
}
}
return true
})
}
}()
}