datasource/etcd/event/dependency_event_handler.go (193 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package event import ( "context" "fmt" "sync" "time" "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/datasource/etcd/path" "github.com/apache/servicecomb-service-center/datasource/etcd/sd" "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore" serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/queue" "github.com/apache/servicecomb-service-center/pkg/util" "github.com/apache/servicecomb-service-center/server/config" pb "github.com/go-chassis/cari/discovery" "github.com/go-chassis/cari/dlock" "github.com/go-chassis/foundation/backoff" "github.com/go-chassis/foundation/gopool" "github.com/go-chassis/foundation/stringutil" "github.com/go-chassis/foundation/timeutil" "github.com/little-cui/etcdadpt" ) const depQueueLockKey = "/dep-queue" // just for unit test var testMux sync.Mutex // DependencyEventHandler add or remove the service dependencies // when user call find instance api or dependence operation api type DependencyEventHandler struct { signals *queue.UniQueue } func (h *DependencyEventHandler) Type() kvstore.Type { return sd.TypeDependencyQueue } func (h *DependencyEventHandler) OnEvent(evt kvstore.Event) { action := evt.Type if action != pb.EVT_CREATE && action != pb.EVT_UPDATE && action != pb.EVT_INIT { return } h.notify() } func (h *DependencyEventHandler) notify() { err := h.signals.Put(struct{}{}) if err != nil { log.Error("", err) } } func (h *DependencyEventHandler) backoff(f func(), retries int) int { if f != nil { <-time.After(backoff.GetBackoff().Delay(retries)) f() } return retries + 1 } func (h *DependencyEventHandler) tryWithBackoff(success func() error, backoff func(), retries int) (int, error) { defer log.Recover() if err := dlock.TryLock(depQueueLockKey, -1); err != nil { log.Error(fmt.Sprintf("try to lock %s failed", depQueueLockKey), err) return 0, nil } defer func() { if err := dlock.Unlock(depQueueLockKey); err != nil { log.Error("unlock failed", err) } }() err := success() if err != nil { log.Error("handle dependency event failed", err) return h.backoff(backoff, retries), err } return 0, nil } func (h *DependencyEventHandler) eventLoop() { gopool.Go(func(ctx context.Context) { // the events will lose, need to handle dependence records periodically period := config.GetRegistry().CacheTTL timer := time.NewTimer(period) retries := 0 for { select { case <-ctx.Done(): return case <-h.signals.Chan(): _, err := h.tryWithBackoff(h.Handle, h.notify, retries) if err != nil { log.Error("", err) } timeutil.ResetTimer(timer, period) case <-timer.C: h.notify() timer.Reset(period) } } }) } type DependencyEventHandlerResource struct { dep *pb.ConsumerDependency kv *kvstore.KeyValue domainProject string } func NewDependencyEventHandlerResource(dep *pb.ConsumerDependency, kv *kvstore.KeyValue, domainProject string) *DependencyEventHandlerResource { return &DependencyEventHandlerResource{ dep, kv, domainProject, } } func (h *DependencyEventHandler) Handle() error { testMux.Lock() defer testMux.Unlock() key := path.GetServiceDependencyQueueRootKey("") resp, err := sd.DependencyQueue().Search(context.Background(), etcdadpt.WithNoCache(), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix()) if err != nil { return err } // maintain dependency rules. l := len(resp.Kvs) if l == 0 { return nil } cleanUpDomainProjects := make(map[string]struct{}) defer h.CleanUp(cleanUpDomainProjects) for _, keyValue := range resp.Kvs { r, ok := keyValue.Value.(*pb.ConsumerDependency) if !ok { log.Error("failed to assert consumerDependency", datasource.ErrAssertFail) continue } _, domainProject, uuid := path.GetInfoFromDependencyQueueKV(keyValue.Key) if uuid == path.DepsQueueUUID { cleanUpDomainProjects[domainProject] = struct{}{} } res := NewDependencyEventHandlerResource(r, keyValue, domainProject) if err := h.dependencyRuleHandle(res); err != nil { return err } } return nil } func (h *DependencyEventHandler) dependencyRuleHandle(res interface{}) error { ctx := util.WithGlobal(context.Background()) dependencyEventHandlerRes := res.(*DependencyEventHandlerResource) r := dependencyEventHandlerRes.dep consumerFlag := util.StringJoin([]string{r.Consumer.Environment, r.Consumer.AppId, r.Consumer.ServiceName, r.Consumer.Version}, "/") domainProject := dependencyEventHandlerRes.domainProject consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{r.Consumer}, domainProject)[0] providersInfo := pb.DependenciesToKeys(r.Providers, domainProject) var ( dep serviceUtil.Dependency err error ) dep.DomainProject = domainProject dep.Consumer = consumerInfo dep.ProvidersRule = providersInfo if r.Override { err = serviceUtil.CreateDependencyRule(ctx, &dep) } else { err = serviceUtil.AddDependencyRule(ctx, &dep) } if err != nil { log.Error(fmt.Sprintf("modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag), err) return fmt.Errorf("override: %t, consumer is %s, %s", r.Override, consumerFlag, err.Error()) } if err = h.removeKV(ctx, dependencyEventHandlerRes.kv); err != nil { log.Error(fmt.Sprintf("remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag), err) return err } log.Info(fmt.Sprintf("maintain dependency [%v] successfully", r)) return nil } func (h *DependencyEventHandler) removeKV(ctx context.Context, kv *kvstore.KeyValue) error { dResp, err := etcdadpt.TxnWithCmp(ctx, etcdadpt.Ops(etcdadpt.OpDel(etcdadpt.WithKey(kv.Key))), etcdadpt.If(etcdadpt.EqualVer(stringutil.Bytes2str(kv.Key), kv.Version)), nil) if err != nil { return fmt.Errorf("can not remove the dependency %s request, %s", util.BytesToStringWithNoCopy(kv.Key), err.Error()) } if !dResp.Succeeded { log.Info(fmt.Sprintf("the dependency %s request is changed", util.BytesToStringWithNoCopy(kv.Key))) } return nil } func (h *DependencyEventHandler) CleanUp(domainProjects map[string]struct{}) { for domainProject := range domainProjects { ctx := util.WithGlobal(context.Background()) if err := serviceUtil.CleanUpDependencyRules(ctx, domainProject); err != nil { log.Error(fmt.Sprintf("clean up '%s' dependency rules failed", domainProject), err) } } } func NewDependencyEventHandler() *DependencyEventHandler { h := &DependencyEventHandler{ signals: queue.NewUniQueue(), } h.eventLoop() return h }