in runtime/core/protocol/grpc/consumer/consumer_group_option.go [118:160]
func NewWebhookGroupTopicOption(cg string,
topic string,
mode pb.Subscription_SubscriptionItem_SubscriptionMode,
gtype consts.GRPCType) ConsumerGroupTopicOption {
opt := &WebhookGroupTopicOption{
BaseConsumerGroupTopicOption: &BaseConsumerGroupTopicOption{
consumerGroup: cg,
topic: topic,
subscriptionMode: mode,
gRPCType: gtype,
},
idcWebhookURLs: new(sync.Map),
allURLs: set.New(set.WithGoroutineSafe()),
}
opt.BaseConsumerGroupTopicOption.registerClient = func(cli *GroupClient) {
if cli.GRPCType != consts.WEBHOOK {
log.Warnf("invalid grpc type:%v, with provide WEBHOOK", cli.GRPCType)
return
}
iwu, ok := opt.idcWebhookURLs.Load(cli.IDC)
if !ok {
newIDCURLs := set.New(set.WithGoroutineSafe())
newIDCURLs.Insert(cli.URL)
opt.idcWebhookURLs.Store(cli.IDC, newIDCURLs)
} else {
val := iwu.(*set.Set)
val.Insert(cli.URL)
opt.idcWebhookURLs.Store(cli.IDC, val)
}
opt.allURLs.Insert(cli.URL)
}
opt.BaseConsumerGroupTopicOption.deregisterClient = func(cli *GroupClient) {
val, ok := opt.idcWebhookURLs.Load(cli.IDC)
if !ok {
return
}
idcURLs := val.(*set.Set)
idcURLs.Erase(cli.URL)
opt.allURLs.Erase(cli.URL)
}
return opt
}