func NewWStreamGroupTopicOption()

in runtime/core/protocol/grpc/consumer/consumer_group_option.go [230:286]


func NewWStreamGroupTopicOption(cg string,
	topic string,
	mode pb.Subscription_SubscriptionItem_SubscriptionMode,
	gtype consts.GRPCType) ConsumerGroupTopicOption {
	opt := &StreamGroupTopicOption{
		BaseConsumerGroupTopicOption: &BaseConsumerGroupTopicOption{
			consumerGroup:    cg,
			topic:            topic,
			subscriptionMode: mode,
			gRPCType:         gtype,
		},
		idcEmitterMap: new(sync.Map),
		idcEmitters:   new(sync.Map),
		totalEmitters: set.New(set.WithGoroutineSafe()),
	}
	opt.registerClient = func(cli *GroupClient) {
		idc := cli.IDC
		clientIP := cli.IP
		pid := cli.PID
		emt := cli.Emiter
		key := uniqClient(clientIP, pid)
		var emiter *sync.Map
		em, ok := opt.idcEmitterMap.Load(idc)
		if !ok {
			emiter = new(sync.Map)
			opt.idcEmitterMap.Store(idc, emiter)
		} else {
			emiter = em.(*sync.Map)
		}
		emiter.Store(key, emt)
		opt.buildIdcEmitter()
		opt.buildTotalEmitter()
	}
	opt.deregisterClient = func(cli *GroupClient) {
		idc := cli.IDC
		clientIP := cli.IP
		pid := cli.PID
		num := 0
		key := uniqClient(clientIP, pid)
		em, ok := opt.idcEmitterMap.Load(idc)
		if !ok {
			return
		}
		emiter := em.(*sync.Map)
		emiter.Delete(key)
		emiter.Range(func(key, value any) bool {
			num++
			return true
		})
		if num == 0 {
			opt.idcEmitterMap.Delete(key)
		}
		opt.buildIdcEmitter()
		opt.buildTotalEmitter()
	}
	return opt
}