in runtime/core/protocol/grpc/consumer/consumer_manager.go [74:103]
func (c *consumerManager) RegisterClient(cli *GroupClient) error {
val, ok := c.consumerGroupClients.Load(cli.ConsumerGroup)
if !ok {
cliset := set.New(set.WithGoroutineSafe())
cliset.Insert(cli)
c.consumerGroupClients.Store(cli.ConsumerGroup, cliset)
return nil
}
localClients := val.(*set.Set)
found := false
for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
lc := iter.Value().(*GroupClient)
if lc.GRPCType == consts.WEBHOOK {
lc.URL = cli.URL
lc.LastUPTime = cli.LastUPTime
found = true
break
}
if lc.GRPCType == consts.STREAM {
lc.Emiter = cli.Emiter
lc.LastUPTime = cli.LastUPTime
found = true
break
}
}
if !found {
localClients.Insert(cli)
}
return nil
}