in runtime/core/protocol/grpc/consumer/consumer_manager.go [105:126]
func (c *consumerManager) DeRegisterClient(cli *GroupClient) error {
val, ok := c.consumerGroupClients.Load(cli.ConsumerGroup)
if !ok {
log.Debugf("no consumer group client found, name:%v", cli.ConsumerGroup)
return nil
}
localClients := val.(*set.Set)
for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
lc := iter.Value().(*GroupClient)
if lc.Topic == cli.Topic {
if lc.GRPCType == consts.STREAM {
// TODO
// close the GRPC client stream before removing it
}
localClients.Erase(lc)
}
}
if localClients.Size() == 0 {
c.consumerGroupClients.Delete(cli.ConsumerGroup)
}
return nil
}