func()

in runtime/core/protocol/grpc/consumer/consumer_manager.go [105:126]


func (c *consumerManager) DeRegisterClient(cli *GroupClient) error {
	val, ok := c.consumerGroupClients.Load(cli.ConsumerGroup)
	if !ok {
		log.Debugf("no consumer group client found, name:%v", cli.ConsumerGroup)
		return nil
	}
	localClients := val.(*set.Set)
	for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
		lc := iter.Value().(*GroupClient)
		if lc.Topic == cli.Topic {
			if lc.GRPCType == consts.STREAM {
				// TODO
				// close the GRPC client stream before removing it
			}
			localClients.Erase(lc)
		}
	}
	if localClients.Size() == 0 {
		c.consumerGroupClients.Delete(cli.ConsumerGroup)
	}
	return nil
}