datasource/mongo/sd/mongo_cacher.go (318 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package sd import ( "context" "errors" "fmt" "sync" "time" "github.com/apache/servicecomb-service-center/datasource/sdcommon" "github.com/apache/servicecomb-service-center/pkg/goutil" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" rmodel "github.com/go-chassis/cari/discovery" "github.com/go-chassis/foundation/backoff" "github.com/go-chassis/foundation/gopool" ) // MongoCacher manages mongo cache. // To updateOp cache, MongoCacher watch mongo event and pull data periodly from mongo. // When the cache data changes, MongoCacher creates events and notifies it's // subscribers. // Use Options to set it's behaviors. type MongoCacher struct { Options *Options reListCount int isFirstTime bool cache MongoCache ready chan struct{} lw sdcommon.ListWatch mux sync.Mutex once sync.Once goroutine *gopool.Pool } func (c *MongoCacher) Cache() MongoCache { return c.cache } func (c *MongoCacher) Run() { c.once.Do(func() { c.goroutine.Do(c.refresh) }) } func (c *MongoCacher) Stop() { c.goroutine.Close(true) util.SafeCloseChan(c.ready) } func (c *MongoCacher) Ready() <-chan struct{} { return c.ready } func (c *MongoCacher) IsReady() bool { select { case <-c.ready: return true default: return false } } func (c *MongoCacher) needList() bool { if c.isFirstTime { c.isFirstTime = false return true } c.reListCount++ if c.reListCount < sdcommon.DefaultForceListInterval { return false } c.reListCount = 0 return true } func (c *MongoCacher) doList(cfg sdcommon.ListWatchConfig) error { resp, err := c.lw.List(cfg) if err != nil { return err } resources := resp.Resources defer log.Debug(fmt.Sprintf("finish to cache key %s, %d items", c.Options.Key, len(resources))) //just reset the cacher if cache marked dirty if c.cache.Dirty() { c.reset(resources) log.Warn(fmt.Sprintf("Cache[%s] is reset!", c.cache.Name())) return nil } // calc and return the diff between cache and mongodb events := c.filter(resources) //notify the subscribers c.sync(events) return nil } func (c *MongoCacher) reset(infos []*sdcommon.Resource) { // clear cache before Set is safe, because the watch operation is stop, // but here will make all API requests go to MONGO directly. c.cache.Clear() // do not notify when cacher is dirty status, // otherwise, too many events will notify to downstream. c.buildCache(c.filter(infos)) } func (c *MongoCacher) doWatch(cfg sdcommon.ListWatchConfig) error { if eventbus := c.lw.EventBus(cfg); eventbus != nil { return c.handleEventBus(eventbus) } return fmt.Errorf("handle a nil watcher") } func (c *MongoCacher) ListAndWatch(ctx context.Context) error { c.mux.Lock() defer c.mux.Unlock() defer log.Recover() // ensure ListAndWatch never raise panic cfg := sdcommon.ListWatchConfig{ Timeout: c.Options.Timeout, Context: ctx, } // first time should initial cache, set watch timeout less if c.isFirstTime { cfg.Timeout = FirstTimeout } err := c.doWatch(cfg) if err != nil { log.Error("doWatch err", err) } // the scenario need to list mongo: // 1. Initial: cache is building, the lister's is first time to run. // 2. Runtime: error occurs in previous watch operation, the lister's status is set to error. // 3. Runtime: watch operation timed out over DEFAULT_FORCE_LIST_INTERVAL times. if c.needList() { // recover timeout for list if c.isFirstTime { cfg.Timeout = c.Options.Timeout } if err := c.doList(cfg); err != nil && (!c.IsReady()) { log.Error("doList error", err) return err // do retry to list mongo } // keep going to next step: // 1. doList return OK. // 2. some traps in mongo client } util.SafeCloseChan(c.ready) return nil } func (c *MongoCacher) handleEventBus(eventbus *sdcommon.EventBus) error { defer eventbus.Stop() if eventbus.Bus == nil { return nil } for resp := range eventbus.ResourceEventBus() { events := make([]MongoEvent, 0) if resp == nil { return errors.New("handle watcher error") } for _, resource := range resp.Resources { if resource.Value == nil { log.Error(fmt.Sprintf("get nil value while watch for mongocache,the docID is %s", resource.Key), nil) break } action := resp.Action var event MongoEvent switch action { case sdcommon.ActionCreate: event = NewMongoEventByResource(resource, rmodel.EVT_CREATE) case sdcommon.ActionUpdate: event = NewMongoEventByResource(resource, rmodel.EVT_UPDATE) case sdcommon.ActionDelete: resource.Value = c.cache.Get(resource.Key) event = NewMongoEventByResource(resource, rmodel.EVT_DELETE) } events = append(events, event) } c.sync(events) log.Debug(fmt.Sprintf("finish to handle %d events, table: %s", len(events), c.Options.Key)) } return nil } func (c *MongoCacher) refresh(ctx context.Context) { log.Debug(fmt.Sprintf("start to list and watch %s", c.Options)) retries := 0 timer := time.NewTimer(sdcommon.MinWaitInterval) defer timer.Stop() for { nextPeriod := sdcommon.MinWaitInterval if err := c.ListAndWatch(ctx); err != nil { retries++ nextPeriod = backoff.GetBackoff().Delay(retries) } else { retries = 0 } select { case <-ctx.Done(): log.Debug(fmt.Sprintf("stop to list and watch %s", c.Options)) return case <-timer.C: timer.Reset(nextPeriod) } } } // keep the evts valID when call sync func (c *MongoCacher) sync(evts []MongoEvent) { if len(evts) == 0 { return } go c.onEvents(evts) } func (c *MongoCacher) filter(infos []*sdcommon.Resource) []MongoEvent { nc := len(infos) newStore := make(map[string]interface{}, nc) for _, info := range infos { newStore[info.Key] = info.Value } filterStopCh := make(chan struct{}) eventsCh := make(chan [sdcommon.EventBlockSize]MongoEvent, 2) go c.filterDelete(newStore, eventsCh, filterStopCh) go c.filterCreateOrUpdate(newStore, eventsCh, filterStopCh) events := make([]MongoEvent, 0, nc) for block := range eventsCh { for _, e := range block { if e.Value == nil { log.Error(fmt.Sprintf("get nil value while do list, the docID is %s", e.DocumentID), nil) break } events = append(events, e) } } return events } func (c *MongoCacher) filterDelete(newStore map[string]interface{}, eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan struct{}) { var block [sdcommon.EventBlockSize]MongoEvent i := 0 c.cache.ForEach(func(k string, v interface{}) (next bool) { next = true _, ok := newStore[k] if ok { // k in store, also in new store, is not deleted, return return } // k in store but not in new store, it means k is deleted if i >= sdcommon.EventBlockSize { eventsCh <- block block = [sdcommon.EventBlockSize]MongoEvent{} i = 0 } block[i] = NewMongoEvent(k, rmodel.EVT_DELETE, v) i++ return }) if i > 0 { eventsCh <- block } close(filterStopCh) } func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{}, eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan struct{}) { var block [sdcommon.EventBlockSize]MongoEvent i := 0 for k, v := range newStore { ov := c.cache.Get(k) if ov == nil { if i >= sdcommon.EventBlockSize { eventsCh <- block block = [sdcommon.EventBlockSize]MongoEvent{} i = 0 } block[i] = NewMongoEvent(k, rmodel.EVT_CREATE, v) i++ continue } if c.cache.isValueNotUpdated(v, ov) { continue } log.Debug(fmt.Sprintf("value is updateOp of key:%s, old value is:%s, new value is:%s", k, ov, v)) if i >= sdcommon.EventBlockSize { eventsCh <- block block = [sdcommon.EventBlockSize]MongoEvent{} i = 0 } block[i] = NewMongoEvent(k, rmodel.EVT_UPDATE, v) i++ } if i > 0 { eventsCh <- block } <-filterStopCh close(eventsCh) } func (c *MongoCacher) onEvents(events []MongoEvent) { c.buildCache(events) c.notify(events) } func (c *MongoCacher) buildCache(events []MongoEvent) { for i, evt := range events { key := evt.DocumentID value := c.cache.Get(key) ok := value != nil switch evt.Type { case rmodel.EVT_CREATE, rmodel.EVT_UPDATE: switch { case !c.IsReady(): evt.Type = rmodel.EVT_INIT case !ok && evt.Type != rmodel.EVT_CREATE: log.Warn(fmt.Sprintf("unexpected %s event! it should be %s key %s", evt.Type, rmodel.EVT_CREATE, key)) evt.Type = rmodel.EVT_CREATE case ok && evt.Type != rmodel.EVT_UPDATE: log.Warn(fmt.Sprintf("unexpected %s event! it should be %s key %s", evt.Type, rmodel.EVT_UPDATE, key)) evt.Type = rmodel.EVT_UPDATE } c.cache.ProcessUpdate(evt) events[i] = evt case rmodel.EVT_DELETE: if !ok { log.Warn(fmt.Sprintf("unexpected %s event! key %s does not cache", evt.Type, key)) } else { evt.Value = value c.cache.ProcessDelete(evt) } events[i] = evt } } } func (c *MongoCacher) notify(evts []MongoEvent) { eventProxy := EventProxy(c.Options.Key) if eventProxy == nil { return } defer log.Recover() for _, evt := range evts { if evt.Type == rmodel.EVT_DELETE && evt.Value == nil { log.Warn(fmt.Sprintf("caught delete event:%s, but value can't get from caches, it may be deleted by last list", evt.DocumentID)) continue } eventProxy.OnEvent(evt) } } func NewMongoCacher(options *Options, cache MongoCache, pf parsefunc) *MongoCacher { return &MongoCacher{ Options: options, isFirstTime: true, cache: cache, ready: make(chan struct{}), lw: &mongoListWatch{ Key: options.Key, parseFunc: pf, }, goroutine: goutil.New(), } }