server/pubsub/notifier/kv.go (109 lines of code) (raw):
package notifier
import (
"context"
"sync"
"time"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/cache"
"github.com/apache/servicecomb-kie/server/pubsub"
kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
"github.com/go-chassis/openlog"
"github.com/hashicorp/serf/serf"
)
// KVHandler handler serf custom event, it is singleton
type KVHandler struct {
BatchSize int
BatchInterval time.Duration
Immediate bool
pendingEvents sync.Map
pendingEventsCount int
}
func (h *KVHandler) RunFlushTask() {
for {
if h.pendingEventsCount >= h.BatchSize {
h.fireEvents()
}
<-time.After(h.BatchInterval)
h.fireEvents()
}
}
func (h *KVHandler) HandleEvent(e serf.Event) {
ue := e.(serf.UserEvent)
ke, err := pubsub.NewKVChangeEvent(ue.Payload)
if err != nil {
openlog.Error("invalid json:" + string(ue.Payload))
}
openlog.Debug("kv event:" + ke.Key)
if h.Immediate { //never retain event, not recommended
h.FindTopicAndFire(ke)
} else {
h.mergeAndSave(ke)
}
}
func (h *KVHandler) mergeAndSave(ke *pubsub.KVChangeEvent) {
id := ke.String()
_, ok := h.pendingEvents.Load(id)
if ok {
openlog.Debug("ignore same event: " + id)
return
}
h.pendingEvents.Store(id, ke)
h.pendingEventsCount++
}
func (h *KVHandler) fireEvents() {
h.pendingEvents.Range(func(key, value interface{}) bool {
ke := value.(*pubsub.KVChangeEvent)
h.FindTopicAndFire(ke)
h.pendingEvents.Delete(key)
h.pendingEventsCount--
return true
})
}
func (h *KVHandler) FindTopicAndFire(ke *pubsub.KVChangeEvent) {
topic := pubsub.Topics()
topic.Range(func(key, value interface{}) bool { //range all topics
t, err := pubsub.ParseTopic(key.(string))
if err != nil {
openlog.Error("can not parse topic " + key.(string) + ": " + err.Error())
return true
}
if t.Match(ke) {
prepareCache(key.(string), t)
notifyAndRemoveObservers(value, ke)
}
return true
})
}
func prepareCache(topicName string, topic *pubsub.Topic) {
rev, kvs, err := kvsvc.ListKV(context.TODO(), &model.ListKVRequest{
Domain: topic.DomainID,
Project: topic.Project,
Labels: topic.Labels,
Match: topic.MatchType,
})
if err != nil {
openlog.Error("can not query kvs:" + err.Error())
}
cache.CachedKV().Write(topicName, &cache.DBResult{
KVs: kvs,
Rev: rev,
Err: err,
})
}
func notifyAndRemoveObservers(value interface{}, ke *pubsub.KVChangeEvent) {
observers := value.(*sync.Map)
observers.Range(func(id, value interface{}) bool {
observer := value.(*pubsub.Observer)
observer.Event <- ke
observers.Delete(id)
return true
})
}
func init() {
h := &KVHandler{
BatchInterval: pubsub.DefaultEventBatchInterval,
BatchSize: pubsub.DefaultEventBatchSize,
Immediate: true,
}
pubsub.RegisterHandler(pubsub.EventKVChange, h)
go h.RunFlushTask()
}