server/datasource/etcd/kv/kv_cache.go (305 lines of code) (raw):

package kv import ( "context" "encoding/json" "fmt" "regexp" "strings" "sync" "time" "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/pkg/stringutil" "github.com/apache/servicecomb-kie/server/config" "github.com/apache/servicecomb-kie/server/datasource" "github.com/apache/servicecomb-kie/server/datasource/etcd/key" "github.com/go-chassis/foundation/backoff" "github.com/go-chassis/openlog" "github.com/little-cui/etcdadpt" goCache "github.com/patrickmn/go-cache" "go.etcd.io/etcd/api/v3/mvccpb" ) func Init() { kvCache = NewKvCache() go kvCache.Refresh(context.Background()) } var kvCache *Cache const ( prefixKvs = "kvs" cacheExpirationTime = 10 * time.Minute cacheCleanupInterval = 11 * time.Minute etcdWatchTimeout = 1 * time.Hour backOffMinInterval = 5 * time.Second ) type IDSet map[string]struct{} type LabelsSet map[string]struct{} type CacheSearchReq struct { Domain string Project string Opts *datasource.FindOptions Regex *regexp.Regexp } func NewKvCache() *Cache { kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval) labelsSet := LabelsSet{} for _, label := range config.Configurations.Cache.Labels { labelsSet[label] = struct{}{} } return &Cache{ timeOut: etcdWatchTimeout, client: etcdadpt.Instance(), revision: 0, kvDocCache: kvDocCache, labelsSet: labelsSet, } } func Enabled() bool { return kvCache != nil } type Cache struct { timeOut time.Duration client etcdadpt.Client revision int64 kvIDCache sync.Map kvDocCache *goCache.Cache labelsSet LabelsSet } func (kc *Cache) Refresh(ctx context.Context) { openlog.Info("start to list and watch") retries := 0 timer := time.NewTimer(backOffMinInterval) defer timer.Stop() for { nextPeriod := backOffMinInterval if err := kc.listWatch(ctx); err != nil { retries++ nextPeriod = backoff.GetBackoff().Delay(retries) } else { retries = 0 } select { case <-ctx.Done(): openlog.Info("stop to list and watch") return case <-timer.C: timer.Reset(nextPeriod) } } } func (kc *Cache) listWatch(ctx context.Context) error { rsp, err := kc.list(ctx) if err != nil { return err } kc.revision = rsp.Revision kc.cachePut(rsp) return kc.watch(ctx) } func (kc *Cache) watch(ctx context.Context) error { timoutCtx, cancel := context.WithTimeout(ctx, kc.timeOut) defer cancel() rev := kc.revision opts := append( etcdadpt.WatchPrefixOpOptions(prefixKvs), etcdadpt.WithRev(kc.revision+1), etcdadpt.WithWatchCallback(kc.watchCallBack), ) err := kc.client.Watch(timoutCtx, opts...) if err != nil { openlog.Error(fmt.Sprintf("watch prefix %s failed, start rev: %d+1->%d->0, err %v", prefixKvs, rev, kc.revision, err)) kc.revision = 0 } return err } func (kc *Cache) list(ctx context.Context) (*etcdadpt.Response, error) { rsp, err := kc.client.Do(ctx, etcdadpt.WatchPrefixOpOptions(prefixKvs)...) if err != nil { openlog.Error(fmt.Sprintf("list prefix %s failed, current rev: %d, err, %v", prefixKvs, kc.revision, err)) return nil, err } return rsp, nil } func (kc *Cache) watchCallBack(_ string, rsp *etcdadpt.Response) error { if rsp == nil || len(rsp.Kvs) == 0 { return fmt.Errorf("unknown event") } kc.revision = rsp.Revision switch rsp.Action { case etcdadpt.ActionPut: kc.cachePut(rsp) case etcdadpt.ActionDelete: kc.cacheDelete(rsp) default: openlog.Warn(fmt.Sprintf("unrecognized action::%v", rsp.Action)) } return nil } func (kc *Cache) cachePut(rsp *etcdadpt.Response) { for _, kv := range rsp.Kvs { kvDoc, err := kc.GetKvDoc(kv) if err != nil { openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err)) continue } if !kc.isInLabelsSet(kvDoc.Labels) { continue } kc.StoreKvDoc(kvDoc.ID, kvDoc) cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) m, ok := kc.LoadKvIDSet(cacheKey) if !ok { kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}}) openlog.Info("cacheKey " + cacheKey + "not exists") continue } m[kvDoc.ID] = struct{}{} } } func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) { for _, kv := range rsp.Kvs { kvDoc, err := kc.GetKvDoc(kv) if err != nil { openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err)) continue } kc.DeleteKvDoc(kvDoc.ID) cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) m, ok := kc.LoadKvIDSet(cacheKey) if !ok { openlog.Error("cacheKey " + cacheKey + "not exists") continue } delete(m, kvDoc.ID) } } func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) { val, ok := kc.kvIDCache.Load(cacheKey) if !ok { return nil, false } kvIds, ok := val.(IDSet) if !ok { return nil, false } return kvIds, true } func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) { kc.kvIDCache.Store(cacheKey, kvIds) } func (kc *Cache) LoadKvDoc(kvID string) (*model.KVDoc, bool) { val, ok := kc.kvDocCache.Get(kvID) if !ok { return nil, false } doc, ok := val.(*model.KVDoc) if !ok { return nil, false } return doc, true } func (kc *Cache) StoreKvDoc(kvID string, kvDoc *model.KVDoc) { kc.kvDocCache.SetDefault(kvID, kvDoc) } func (kc *Cache) DeleteKvDoc(kvID string) { kc.kvDocCache.Delete(kvID) } func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc { if len(kvIdsLeft) == 0 { return nil } openlog.Debug("get kv from etcd by kvId") wg := sync.WaitGroup{} docs := make([]*model.KVDoc, len(kvIdsLeft)) for i, kvID := range kvIdsLeft { wg.Add(1) go func(kvID string, cnt int) { defer wg.Done() docKey := key.KV(req.Domain, req.Project, kvID) kv, err := etcdadpt.Get(ctx, docKey) if err != nil { openlog.Error(fmt.Sprintf("failed to get kv from etcd, err %v", err)) return } doc, err := kc.GetKvDoc(kv) if err != nil { openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err)) return } kc.StoreKvDoc(doc.ID, doc) docs[cnt] = doc }(kvID, i) } wg.Wait() return docs } func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) { kvDoc := &model.KVDoc{} err := json.Unmarshal(kv.Value, kvDoc) if err != nil { return nil, err } return kvDoc, nil } func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) string { labelFormat := stringutil.FormatMap(labels) inputKey := strings.Join([]string{ "", domain, project, labelFormat, }, "/") return inputKey } func (kc *Cache) isInLabelsSet(Labels map[string]string) bool { for label := range Labels { if _, ok := kc.labelsSet[label]; !ok { return false } } return true } func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) { result := &model.KVResponse{ Data: []*model.KVDoc{}, } if !req.Opts.ExactLabels || !kvCache.isInLabelsSet(req.Opts.Labels) { return result, false, nil } openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) kvIds, ok := kvCache.LoadKvIDSet(cacheKey) if !ok { kvCache.StoreKvIDSet(cacheKey, IDSet{}) return result, true, nil } var docs []*model.KVDoc var kvIdsLeft []string for kvID := range kvIds { if doc, ok := kvCache.LoadKvDoc(kvID); ok { docs = append(docs, doc) continue } kvIdsLeft = append(kvIdsLeft, kvID) } tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) docs = append(docs, tpData...) for _, doc := range docs { if isMatch(req, doc) { datasource.ClearPart(doc) result.Data = append(result.Data, doc) } } result.Total = len(result.Data) return result, true, nil } func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { if doc == nil { return false } if req.Opts.Status != "" && doc.Status != req.Opts.Status { return false } if req.Regex != nil && !req.Regex.MatchString(doc.Key) { return false } return true }