server/datasource/mongo/kv/kv_dao.go (667 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package kv import ( "context" "fmt" "regexp" "strings" dmongo "github.com/go-chassis/cari/db/mongo" "github.com/go-chassis/cari/sync" "github.com/go-chassis/openlog" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/pkg/util" "github.com/apache/servicecomb-kie/server/datasource" mmodel "github.com/apache/servicecomb-kie/server/datasource/mongo/model" ) const ( MsgFindKvFailed = "find kv failed, deadline exceeded" FmtErrFindKvFailed = "can not find kv in %s" ) // Dao operate data in mongodb type Dao struct { } func (s *Dao) Create(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) (*model.KVDoc, error) { opts := datasource.NewWriteOptions(options...) if opts.SyncEnable { // if syncEnable is true, will create kv with task return txnCreate(ctx, kv) } return create(ctx, kv) } func create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) { collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) _, err := collection.InsertOne(ctx, kv) if err != nil { openlog.Error("create error", openlog.WithTags(openlog.Tags{ "err": err.Error(), "kv": kv, })) return nil, err } return kv, nil } // txnCreate is to start transaction when creating KV, will create task in a transaction operation func txnCreate(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) { taskSession, err := dmongo.GetClient().GetDB().Client().StartSession() if err != nil { return nil, err } if err = taskSession.StartTransaction(); err != nil { return nil, err } defer taskSession.EndSession(ctx) if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error { collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) _, err = collection.InsertOne(sessionContext, kv) if err != nil { openlog.Error("create error", openlog.WithTags(openlog.Tags{ "err": err.Error(), "kv": kv, })) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("fail to abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), "kv": kv, })) } return err } task, err := sync.NewTask(kv.Domain, kv.Project, sync.CreateAction, datasource.ConfigResource, kv) if err != nil { openlog.Error("fail to create task" + err.Error()) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("fail to abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), "kv": kv, })) } return err } collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask) _, err = collection.InsertOne(sessionContext, task) if err != nil { openlog.Error("create task error", openlog.WithTags(openlog.Tags{ "err": err.Error(), "task": task, })) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), "task": task, })) } return err } if err = taskSession.CommitTransaction(sessionContext); err != nil { return err } return nil }); err != nil { openlog.Error(err.Error()) return nil, err } return kv, nil } // Update update key value func (s *Dao) Update(ctx context.Context, kv *model.KVDoc, options ...datasource.WriteOption) error { opts := datasource.NewWriteOptions(options...) // if syncEnable is true, will create kv with task if opts.SyncEnable { return txnUpdate(ctx, kv) } return update(ctx, kv) } func update(ctx context.Context, kv *model.KVDoc) error { collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) _, err := collection.UpdateOne(ctx, bson.M{"key": kv.Key, "label_format": kv.LabelFormat}, bson.D{ {Key: "$set", Value: bson.D{ {Key: "value", Value: kv.Value}, {Key: "status", Value: kv.Status}, {Key: "checker", Value: kv.Checker}, {Key: "update_time", Value: kv.UpdateTime}, {Key: "update_revision", Value: kv.UpdateRevision}, }}, }) if err != nil { return err } return nil } // txnUpdate is to start transaction when updating kV, will create task in a transaction operation func txnUpdate(ctx context.Context, kv *model.KVDoc) error { taskSession, err := dmongo.GetClient().GetDB().Client().StartSession() if err != nil { return err } if err = taskSession.StartTransaction(); err != nil { return err } defer taskSession.EndSession(ctx) if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error { collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) result := collection.FindOneAndUpdate(sessionContext, bson.M{"key": kv.Key, "label_format": kv.LabelFormat}, bson.D{ {Key: "$set", Value: bson.D{ {Key: "value", Value: kv.Value}, {Key: "status", Value: kv.Status}, {Key: "checker", Value: kv.Checker}, {Key: "update_time", Value: kv.UpdateTime}, {Key: "update_revision", Value: kv.UpdateRevision}, }}, }) if result.Err() != nil { errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), "kv": kv, })) } if result.Err() == mongo.ErrNoDocuments { return datasource.ErrKeyNotExists } return result.Err() } curKV := &model.KVDoc{} err := result.Decode(curKV) if err != nil { errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), "kv": kv, })) } openlog.Error("decode error: " + err.Error()) return err } task, err := sync.NewTask(kv.Domain, kv.Project, sync.UpdateAction, datasource.ConfigResource, curKV) if err != nil { openlog.Error("fail to create task" + err.Error()) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), "kv": kv, })) } return err } collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask) _, err = collection.InsertOne(sessionContext, task) if err != nil { openlog.Error("create task error", openlog.WithTags(openlog.Tags{ "err": err.Error(), "task": task, })) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), "task": task, })) } return err } if err = taskSession.CommitTransaction(sessionContext); err != nil { return err } return nil }); err != nil { openlog.Error(err.Error()) return err } return nil } // Extract key values func getValue(str string) string { rex := regexp.MustCompile(`\(([^)]+)\)`) res := rex.FindStringSubmatch(str) return res[len(res)-1] } func findKV(ctx context.Context, domain string, project string, opts datasource.FindOptions) (*mongo.Cursor, int, error) { collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) ctx, cancel := context.WithTimeout(ctx, opts.Timeout) defer cancel() filter := bson.M{"domain": domain, "project": project} if opts.Key != "" { filter["key"] = opts.Key switch { case strings.HasPrefix(opts.Key, "beginWith("): value := strings.ReplaceAll(getValue(opts.Key), ".", "\\.") filter["key"] = bson.M{"$regex": "^" + value + ".*$", "$options": "$i"} case strings.HasPrefix(opts.Key, "wildcard("): value := strings.ReplaceAll(getValue(opts.Key), ".", "\\.") value = strings.ReplaceAll(value, "*", ".*") filter["key"] = bson.M{"$regex": "^" + value + "$", "$options": "$i"} } } if len(opts.Labels) != 0 { for k, v := range opts.Labels { filter["labels."+k] = v } } opt := options.Find().SetSort(map[string]interface{}{ "update_revision": -1, }) if opts.Limit > 0 { opt = opt.SetLimit(opts.Limit) opt = opt.SetSkip(opts.Offset) } curTotal, err := collection.CountDocuments(ctx, filter) if err != nil { if err.Error() == context.DeadlineExceeded.Error() { openlog.Error(MsgFindKvFailed, openlog.WithTags(openlog.Tags{ "timeout": opts.Timeout, })) return nil, 0, fmt.Errorf(FmtErrFindKvFailed, opts.Timeout) } return nil, 0, err } if opts.Status != "" { filter["status"] = opts.Status } cur, err := collection.Find(ctx, filter, opt) if err != nil { if err.Error() == context.DeadlineExceeded.Error() { openlog.Error(MsgFindKvFailed, openlog.WithTags(openlog.Tags{ "timeout": opts.Timeout, })) return nil, 0, fmt.Errorf(FmtErrFindKvFailed, opts.Timeout) } return nil, 0, err } return cur, int(curTotal), err } func findOneKey(ctx context.Context, filter bson.M) ([]*model.KVDoc, error) { collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) sr := collection.FindOne(ctx, filter) if sr.Err() != nil { if sr.Err() == mongo.ErrNoDocuments { return nil, datasource.ErrKeyNotExists } return nil, sr.Err() } curKV := &model.KVDoc{} err := sr.Decode(curKV) if err != nil { openlog.Error("decode error: " + err.Error()) return nil, err } return []*model.KVDoc{curKV}, nil } // Exist supports you query a key value by label map or labels id func (s *Dao) Exist(ctx context.Context, key, project, domain string, options ...datasource.FindOption) (bool, error) { opts := datasource.FindOptions{} for _, o := range options { o(&opts) } if opts.LabelFormat != "" { _, err := findKVByLabel(ctx, domain, opts.LabelFormat, key, project) if err != nil { if err == datasource.ErrKeyNotExists { return false, nil } openlog.Error(err.Error()) return false, err } return true, nil } kvs, err := s.List(ctx, domain, project, datasource.WithExactLabels(), datasource.WithLabels(opts.Labels), datasource.WithKey(key)) if err != nil { openlog.Error("check kv exist: " + err.Error()) return false, err } if len(kvs.Data) != 1 { return false, datasource.ErrTooMany } return true, nil } // FindOneAndDelete deletes one kv by id and return the deleted kv as these appeared before deletion // domain=tenant func (s *Dao) FindOneAndDelete(ctx context.Context, kvID, project, domain string, options ...datasource.WriteOption) (*model.KVDoc, error) { opts := datasource.NewWriteOptions(options...) if opts.SyncEnable { // if syncEnable is ture, will delete kv, create task and create tombstone return txnFindOneAndDelete(ctx, kvID, project, domain) } return findOneAndDelete(ctx, kvID, project, domain) } func findOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) { curKV := &model.KVDoc{} collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) sr := collection.FindOneAndDelete(ctx, bson.M{"id": kvID, "project": project, "domain": domain}) if sr.Err() != nil { if sr.Err() == mongo.ErrNoDocuments { return nil, datasource.ErrKeyNotExists } return nil, sr.Err() } err := sr.Decode(curKV) if err != nil { openlog.Error("decode error: " + err.Error()) return nil, err } return curKV, nil } // txnFindOneAndDelete is to start transaction when delete KV, will create task and tombstone in a transaction operation func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) { curKV := &model.KVDoc{} taskSession, err := dmongo.GetClient().GetDB().Client().StartSession() if err != nil { openlog.Error("fail to start session" + err.Error()) return nil, err } if err = taskSession.StartTransaction(); err != nil { openlog.Error("fail to start transaction" + err.Error()) return nil, err } defer taskSession.EndSession(ctx) if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error { collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) sr := collection.FindOneAndDelete(sessionContext, bson.M{"id": kvID, "project": project, "domain": domain}) if sr.Err() != nil { errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), })) return errAbort } if sr.Err() == mongo.ErrNoDocuments { openlog.Error(datasource.ErrKeyNotExists.Error()) return datasource.ErrKeyNotExists } return sr.Err() } err := sr.Decode(curKV) if err != nil { openlog.Error("decode error: " + err.Error()) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), })) return errAbort } return err } task, err := sync.NewTask(domain, project, sync.DeleteAction, datasource.ConfigResource, curKV) if err != nil { openlog.Error("fail to create task" + err.Error()) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), })) return errAbort } return err } collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask) _, err = collection.InsertOne(sessionContext, task) if err != nil { openlog.Error("create task error", openlog.WithTags(openlog.Tags{ "err": err.Error(), "task": task, })) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), "task": task, })) } return err } tombstone := sync.NewTombstone(domain, project, datasource.ConfigResource, datasource.TombstoneID(curKV)) collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTombstone) _, err = collection.InsertOne(sessionContext, tombstone) if err != nil { openlog.Error("create tombstone error", openlog.WithTags(openlog.Tags{ "err": err.Error(), "tombstone": tombstone, })) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), "tombstone": tombstone, })) } return err } if err = taskSession.CommitTransaction(sessionContext); err != nil { return err } return nil }); err != nil { openlog.Error(err.Error()) return nil, err } return curKV, nil } // FindManyAndDelete deletes multiple kvs and return the deleted kv list as these appeared before deletion func (s *Dao) FindManyAndDelete(ctx context.Context, kvIDs []string, project, domain string, options ...datasource.WriteOption) ([]*model.KVDoc, int64, error) { opts := datasource.NewWriteOptions(options...) if opts.SyncEnable { // if sync enable is true, will delete kvs, create tasks and tombstones return txnFindManyAndDelete(ctx, kvIDs, project, domain) } return findManyAndDelete(ctx, kvIDs, project, domain) } func findManyAndDelete(ctx context.Context, kvIDs []string, project, domain string) ([]*model.KVDoc, int64, error) { filter := bson.D{ {Key: "id", Value: bson.M{"$in": kvIDs}}, {Key: "project", Value: project}, {Key: "domain", Value: domain}} kvs, err := findKeys(ctx, filter, false) if err != nil { if err != datasource.ErrKeyNotExists { openlog.Error("find Keys error: " + err.Error()) } return nil, 0, err } collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) dr, err := collection.DeleteMany(ctx, filter) if err != nil { openlog.Error(fmt.Sprintf("delete kvs [%v] failed : [%v]", kvIDs, err)) return nil, 0, err } return kvs, dr.DeletedCount, nil } // txnFindManyAndDelete is to start transaction when delete KVs, will create tasks and tombstones in a transaction operation func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain string) ([]*model.KVDoc, int64, error) { filter := bson.D{ {Key: "id", Value: bson.M{"$in": kvIDs}}, {Key: "project", Value: project}, {Key: "domain", Value: domain}} kvs, err := findKeys(ctx, filter, false) if err != nil { if err != datasource.ErrKeyNotExists { openlog.Error("find Keys error: " + err.Error()) } return nil, 0, err } var deletedCount int64 taskSession, err := dmongo.GetClient().GetDB().Client().StartSession() if err != nil { openlog.Error("fail to start session" + err.Error()) return nil, 0, err } if err = taskSession.StartTransaction(); err != nil { openlog.Error("fail to start transaction" + err.Error()) return nil, 0, err } defer taskSession.EndSession(ctx) if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error { collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) filter := bson.D{ {Key: "id", Value: bson.M{"$in": kvIDs}}, {Key: "project", Value: project}, {Key: "domain", Value: domain}} dr, err := collection.DeleteMany(sessionContext, filter) if err != nil { errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), })) return errAbort } openlog.Error(fmt.Sprintf("delete kvs [%v] failed : [%v]", kvIDs, err)) return err } deletedCount = dr.DeletedCount tasksDoc := make([]interface{}, deletedCount) tombstonesDoc := make([]interface{}, deletedCount) for i := 0; i < int(deletedCount); i++ { kv := kvs[i] task, _ := sync.NewTask(domain, project, sync.DeleteAction, datasource.ConfigResource, kv) tombstone := sync.NewTombstone(domain, project, datasource.ConfigResource, datasource.TombstoneID(kv)) tasksDoc[i] = task tombstonesDoc[i] = tombstone } collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask) _, err = collection.InsertMany(sessionContext, tasksDoc) if err != nil { openlog.Error("create tasks error", openlog.WithTags(openlog.Tags{ "err": err.Error(), })) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), })) } return err } collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTombstone) _, err = collection.InsertMany(sessionContext, tombstonesDoc) if err != nil { openlog.Error("create tombstone error", openlog.WithTags(openlog.Tags{ "err": err.Error(), })) errAbort := taskSession.AbortTransaction(sessionContext) if errAbort != nil { openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{ "err": errAbort.Error(), })) } return err } if err = taskSession.CommitTransaction(sessionContext); err != nil { return err } return nil }); err != nil { openlog.Error(err.Error()) return nil, 0, err } return kvs, deletedCount, nil } func findKeys(ctx context.Context, filter interface{}, withoutLabel bool) ([]*model.KVDoc, error) { collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) cur, err := collection.Find(ctx, filter) if err != nil { if err.Error() == context.DeadlineExceeded.Error() { openlog.Error("find kvs failed: " + err.Error()) return nil, fmt.Errorf("can not find keys due to timout") } return nil, err } defer cur.Close(ctx) if cur.Err() != nil { return nil, err } kvs := make([]*model.KVDoc, 0) for cur.Next(ctx) { curKV := &model.KVDoc{} if err := cur.Decode(curKV); err != nil { openlog.Error("decode to KVs error: " + err.Error()) return nil, err } if withoutLabel { curKV.Labels = nil } kvs = append(kvs, curKV) } if len(kvs) == 0 { return nil, datasource.ErrKeyNotExists } return kvs, nil } // findKVByLabel get kvs by key and label // key can be empty, then it will return all key values // if key is given, will return 0-1 key value func findKVByLabel(ctx context.Context, domain, labelFormat, key string, project string) ([]*model.KVDoc, error) { filter := bson.M{"label_format": labelFormat, "domain": domain, "project": project} if key != "" { filter["key"] = key return findOneKey(ctx, filter) } return findKeys(ctx, filter, true) } // Get get kv by kv id func (s *Dao) Get(ctx context.Context, req *model.GetKVRequest) (*model.KVDoc, error) { filter := bson.M{"id": req.ID, "domain": req.Domain, "project": req.Project} kvs, err := findOneKey(ctx, filter) if err != nil { openlog.Error(err.Error()) return nil, err } return kvs[0], nil } func (s *Dao) GetByKey(ctx context.Context, key, project, domain string, options ...datasource.FindOption) ([]*model.KVDoc, error) { opts := datasource.FindOptions{} for _, o := range options { o(&opts) } if opts.LabelFormat != "" { kvs, err := findKVByLabel(ctx, domain, opts.LabelFormat, key, project) if err != nil { return nil, err } return kvs, nil } kvs, err := s.List(ctx, domain, project, datasource.WithExactLabels(), datasource.WithLabels(opts.Labels), datasource.WithKey(key)) if err != nil { openlog.Error("check kv exist: " + err.Error()) return nil, err } if len(kvs.Data) != 1 { return nil, datasource.ErrTooMany } return kvs.Data, nil } func (s *Dao) Total(ctx context.Context, project, domain string) (int64, error) { collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV) filter := bson.M{"domain": domain, "project": project} total, err := collection.CountDocuments(ctx, filter) if err != nil { openlog.Error("find total number: " + err.Error()) return 0, err } return total, err } // List get kv list by key and criteria func (s *Dao) List(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) { opts := datasource.NewDefaultFindOpts() for _, o := range options { o(&opts) } cur, total, err := findKV(ctx, domain, project, opts) if err != nil { return nil, err } defer cur.Close(ctx) result := &model.KVResponse{ Data: []*model.KVDoc{}, } for cur.Next(ctx) { curKV := &model.KVDoc{} if err := cur.Decode(curKV); err != nil { openlog.Error("decode to KVs error: " + err.Error()) return nil, err } if opts.ExactLabels { if !util.IsEquivalentLabel(opts.Labels, curKV.Labels) { continue } } datasource.ClearPart(curKV) result.Data = append(result.Data, curKV) } result.Total = total return result, nil }