in runtime/core/protocol/grpc/consumer/consumer_group_option.go [230:286]
func NewWStreamGroupTopicOption(cg string,
topic string,
mode pb.Subscription_SubscriptionItem_SubscriptionMode,
gtype consts.GRPCType) ConsumerGroupTopicOption {
opt := &StreamGroupTopicOption{
BaseConsumerGroupTopicOption: &BaseConsumerGroupTopicOption{
consumerGroup: cg,
topic: topic,
subscriptionMode: mode,
gRPCType: gtype,
},
idcEmitterMap: new(sync.Map),
idcEmitters: new(sync.Map),
totalEmitters: set.New(set.WithGoroutineSafe()),
}
opt.registerClient = func(cli *GroupClient) {
idc := cli.IDC
clientIP := cli.IP
pid := cli.PID
emt := cli.Emiter
key := uniqClient(clientIP, pid)
var emiter *sync.Map
em, ok := opt.idcEmitterMap.Load(idc)
if !ok {
emiter = new(sync.Map)
opt.idcEmitterMap.Store(idc, emiter)
} else {
emiter = em.(*sync.Map)
}
emiter.Store(key, emt)
opt.buildIdcEmitter()
opt.buildTotalEmitter()
}
opt.deregisterClient = func(cli *GroupClient) {
idc := cli.IDC
clientIP := cli.IP
pid := cli.PID
num := 0
key := uniqClient(clientIP, pid)
em, ok := opt.idcEmitterMap.Load(idc)
if !ok {
return
}
emiter := em.(*sync.Map)
emiter.Delete(key)
emiter.Range(func(key, value any) bool {
num++
return true
})
if num == 0 {
opt.idcEmitterMap.Delete(key)
}
opt.buildIdcEmitter()
opt.buildTotalEmitter()
}
return opt
}