datasource/etcd/sd/instance_event_handler.go (156 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package sd import ( "context" "fmt" "sync" "time" "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" "github.com/go-chassis/cari/discovery" "github.com/go-chassis/foundation/gopool" "github.com/go-chassis/foundation/timeutil" ) type deferItem struct { ReplayAfter int32 // in seconds event kvstore.Event } type InstanceEventDeferHandler struct { Percent float64 cache kvstore.CacheReader once sync.Once enabled bool items map[string]*deferItem evts chan []kvstore.Event replayCh chan kvstore.Event resetCh chan struct{} } func (iedh *InstanceEventDeferHandler) OnCondition(cache kvstore.CacheReader, evts []kvstore.Event) bool { if iedh.Percent <= 0 { return false } iedh.once.Do(func() { iedh.cache = cache iedh.items = make(map[string]*deferItem) iedh.evts = make(chan []kvstore.Event, eventBlockSize) iedh.replayCh = make(chan kvstore.Event, eventBlockSize) iedh.resetCh = make(chan struct{}) gopool.Go(iedh.check) }) iedh.evts <- evts return true } func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt kvstore.Event) { if evt.KV == nil { log.Error(fmt.Sprintf("defer or replayEvent a %s nil KV", evt.Type), nil) return } kv := evt.KV key := util.BytesToStringWithNoCopy(kv.Key) _, ok := iedh.items[key] switch evt.Type { case discovery.EVT_CREATE, discovery.EVT_UPDATE: if ok { log.Info(fmt.Sprintf("recovered key %s events", key)) // return nil // no need to publish event to subscribers? } iedh.replayEvent(evt) case discovery.EVT_DELETE: if ok { return } instance := kv.Value.(*discovery.MicroServiceInstance) if instance == nil { log.Error(fmt.Sprintf("defer or replayEvent a %s nil Value, KV is %v", evt.Type, kv), nil) return } ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1) if ttl <= 0 || ttl > selfPreservationMaxTTL { ttl = selfPreservationMaxTTL } iedh.items[key] = &deferItem{ ReplayAfter: ttl, event: evt, } } } func (iedh *InstanceEventDeferHandler) HandleChan() <-chan kvstore.Event { return iedh.replayCh } func (iedh *InstanceEventDeferHandler) check(ctx context.Context) { defer log.Recover() t, n := time.NewTimer(deferCheckWindow), false defer t.Stop() for { select { case <-ctx.Done(): log.Error("self preservation routine dead", nil) return case evts := <-iedh.evts: for _, evt := range evts { iedh.recoverOrDefer(evt) } del := len(iedh.items) if del == 0 { continue } if iedh.enabled { continue } total := iedh.cache.GetAll(nil) if total > selfPreservationInitCount && float64(del) >= float64(total)*iedh.Percent { iedh.enabled = true log.Warn(fmt.Sprintf("self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events", del, total, iedh.Percent*100)) } if !n { timeutil.ResetTimer(t, deferCheckWindow) n = true } case <-t.C: n = false t.Reset(deferCheckWindow) if !iedh.enabled { for _, item := range iedh.items { iedh.replayEvent(item.event) } continue } iedh.ReplayEvents() case <-iedh.resetCh: iedh.ReplayEvents() iedh.enabled = false timeutil.ResetTimer(t, deferCheckWindow) } } } func (iedh *InstanceEventDeferHandler) ReplayEvents() { interval := int32(deferCheckWindow / time.Second) for key, item := range iedh.items { item.ReplayAfter -= interval if item.ReplayAfter > 0 { continue } log.Warn(fmt.Sprintf("replay delete event, remove key: %s", key)) iedh.replayEvent(item.event) } if len(iedh.items) == 0 { iedh.enabled = false log.Warn("self preservation stopped") } } func (iedh *InstanceEventDeferHandler) replayEvent(evt kvstore.Event) { key := util.BytesToStringWithNoCopy(evt.KV.Key) delete(iedh.items, key) iedh.replayCh <- evt } func (iedh *InstanceEventDeferHandler) Reset() bool { if iedh.enabled || len(iedh.items) != 0 { log.Warn("self preservation is reset") iedh.resetCh <- struct{}{} return true } return false } func NewInstanceEventDeferHandler() *InstanceEventDeferHandler { return &InstanceEventDeferHandler{Percent: selfPreservationPercentage} }