func NewWebhookGroupTopicOption()

in runtime/core/protocol/grpc/consumer/consumer_group_option.go [118:160]


func NewWebhookGroupTopicOption(cg string,
	topic string,
	mode pb.Subscription_SubscriptionItem_SubscriptionMode,
	gtype consts.GRPCType) ConsumerGroupTopicOption {
	opt := &WebhookGroupTopicOption{
		BaseConsumerGroupTopicOption: &BaseConsumerGroupTopicOption{
			consumerGroup:    cg,
			topic:            topic,
			subscriptionMode: mode,
			gRPCType:         gtype,
		},
		idcWebhookURLs: new(sync.Map),
		allURLs:        set.New(set.WithGoroutineSafe()),
	}
	opt.BaseConsumerGroupTopicOption.registerClient = func(cli *GroupClient) {
		if cli.GRPCType != consts.WEBHOOK {
			log.Warnf("invalid grpc type:%v, with provide WEBHOOK", cli.GRPCType)
			return
		}
		iwu, ok := opt.idcWebhookURLs.Load(cli.IDC)
		if !ok {
			newIDCURLs := set.New(set.WithGoroutineSafe())
			newIDCURLs.Insert(cli.URL)
			opt.idcWebhookURLs.Store(cli.IDC, newIDCURLs)
		} else {
			val := iwu.(*set.Set)
			val.Insert(cli.URL)
			opt.idcWebhookURLs.Store(cli.IDC, val)
		}
		opt.allURLs.Insert(cli.URL)
	}

	opt.BaseConsumerGroupTopicOption.deregisterClient = func(cli *GroupClient) {
		val, ok := opt.idcWebhookURLs.Load(cli.IDC)
		if !ok {
			return
		}
		idcURLs := val.(*set.Set)
		idcURLs.Erase(cli.URL)
		opt.allURLs.Erase(cli.URL)
	}
	return opt
}