func()

in runtime/core/protocol/grpc/consumer/consumer_manager.go [74:103]


func (c *consumerManager) RegisterClient(cli *GroupClient) error {
	val, ok := c.consumerGroupClients.Load(cli.ConsumerGroup)
	if !ok {
		cliset := set.New(set.WithGoroutineSafe())
		cliset.Insert(cli)
		c.consumerGroupClients.Store(cli.ConsumerGroup, cliset)
		return nil
	}
	localClients := val.(*set.Set)
	found := false
	for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
		lc := iter.Value().(*GroupClient)
		if lc.GRPCType == consts.WEBHOOK {
			lc.URL = cli.URL
			lc.LastUPTime = cli.LastUPTime
			found = true
			break
		}
		if lc.GRPCType == consts.STREAM {
			lc.Emiter = cli.Emiter
			lc.LastUPTime = cli.LastUPTime
			found = true
			break
		}
	}
	if !found {
		localClients.Insert(cli)
	}
	return nil
}