server/service/kv/kv_svc.go (310 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package kv import ( "context" "crypto/sha256" "fmt" "strings" "time" "github.com/apache/servicecomb-kie/pkg/util" "github.com/apache/servicecomb-kie/pkg/common" "github.com/apache/servicecomb-kie/pkg/concurrency" "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/pkg/stringutil" "github.com/apache/servicecomb-kie/server/datasource" "github.com/apache/servicecomb-kie/server/pubsub" "github.com/apache/servicecomb-kie/server/service/sync" "github.com/go-chassis/cari/config" "github.com/go-chassis/cari/pkg/errsvc" "github.com/go-chassis/foundation/validator" "github.com/go-chassis/go-chassis/v2/pkg/backends/quota" "github.com/go-chassis/openlog" ) var listSema = concurrency.NewSemaphore(concurrency.DefaultConcurrency) func ListKV(ctx context.Context, request *model.ListKVRequest) (int64, *model.KVResponse, *errsvc.Error) { opts := []datasource.FindOption{ datasource.WithKey(request.Key), datasource.WithLabels(request.Labels), datasource.WithOffset(request.Offset), datasource.WithLimit(request.Limit), } m := request.Match if m == common.PatternExact { opts = append(opts, datasource.WithExactLabels()) } if request.Status != "" { opts = append(opts, datasource.WithStatus(request.Status)) } rev, err := datasource.GetBroker().GetRevisionDao().GetRevision(ctx, request.Domain) if err != nil { return rev, nil, config.NewError(config.ErrInternal, err.Error()) } kv, err := List(ctx, request.Project, request.Domain, opts...) if err != nil { openlog.Error("common: " + err.Error()) return rev, nil, config.NewError(config.ErrInternal, common.MsgDBError) } return rev, kv, nil } // Create get latest revision from history // and increase revision of label // and insert key func Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error) { if kv.Status == "" { kv.Status = common.StatusDisabled } err := validator.Validate(kv) if err != nil { return nil, config.NewError(config.ErrInvalidParams, err.Error()) } err = quota.PreCreate(kv.Domain, kv.Project, "", 1) if err != nil { if err == quota.ErrReached { openlog.Error(fmt.Sprintf("can not create kv %s@%s, due to quota violation", kv.Key, kv.Project)) return nil, config.NewError(config.ErrNotEnoughQuota, err.Error()) } openlog.Error(err.Error()) return nil, config.NewError(config.ErrInternal, "quota check failed") } if kv.Labels == nil { kv.Labels = map[string]string{} } kv.LabelFormat = stringutil.FormatMap(kv.Labels) if kv.ValueType == "" { kv.ValueType = datasource.DefaultValueType } //check whether the project has certain labels or not exist, err := datasource.GetBroker().GetKVDao().Exist( ctx, kv.Key, kv.Project, kv.Domain, datasource.WithLabelFormat(kv.LabelFormat), datasource.WithLabels(kv.Labels), ) if err != nil { openlog.Error(err.Error()) return nil, config.NewError(config.ErrInternal, "create kv failed") } if exist { return kv, config.NewError(config.ErrRecordAlreadyExists, datasource.ErrKVAlreadyExists.Error()) } revision, err := datasource.GetBroker().GetRevisionDao().ApplyRevision(ctx, kv.Domain) if err != nil { openlog.Error(err.Error()) return nil, config.NewError(config.ErrInternal, "create kv failed") } err = completeKV(kv, revision) if err != nil { openlog.Error(err.Error()) return nil, config.NewError(config.ErrInternal, "create kv failed") } kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv, datasource.WithSync(sync.FromContext(ctx))) if err != nil { openlog.Error(fmt.Sprintf("post err:%s", err.Error())) return nil, util.SvcErr(err) } err = datasource.GetBroker().GetHistoryDao().AddHistory(ctx, kv) if err != nil { openlog.Warn( fmt.Sprintf("can not updateKeyValue version for [%s] [%s] in [%s]", kv.Key, kv.Labels, kv.Domain)) } openlog.Debug(fmt.Sprintf("create %s with labels %s length [%d]", kv.Key, kv.Labels, len(kv.Value))) datasource.ClearPart(kv) return kv, nil } func completeKV(kv *model.KVDoc, revision int64) error { kv.ID = fmt.Sprintf("%x", sha256.Sum256([]byte(strings.Join([]string{ kv.Domain, kv.Project, kv.Key, kv.LabelFormat, }, "/")))) kv.UpdateRevision = revision kv.CreateRevision = revision now := time.Now().Unix() kv.CreateTime = now kv.UpdateTime = now return nil } func Upload(ctx context.Context, request *model.UploadKVRequest) *model.DocRespOfUpload { override := request.Override kvs := request.KVs result := &model.DocRespOfUpload{ Success: []*model.KVDoc{}, Failure: []*model.DocFailedOfUpload{}, } strategy := SelectStrategy(override) for i, kv := range kvs { if kv == nil { continue } kv.Domain = request.Domain kv.Project = request.Project kv, err := strategy.Execute(ctx, kv) if err != nil { if err.Code == config.ErrStopUpload { appendAbortFailedKVResult(kvs[i:], result) break } appendFailedKVResult(err, kv, result) continue } Publish(kv) result.Success = append(result.Success, kv) } return result } func appendFailedKVResult(err *errsvc.Error, kv *model.KVDoc, result *model.DocRespOfUpload) { failedKv := &model.DocFailedOfUpload{ Key: kv.Key, Labels: kv.Labels, ErrCode: err.Code, ErrMsg: err.Detail, } result.Failure = append(result.Failure, failedKv) } func appendAbortFailedKVResult(kvs []*model.KVDoc, result *model.DocRespOfUpload) { for _, kv := range kvs { failedKv := &model.DocFailedOfUpload{ Key: kv.Key, Labels: kv.Labels, ErrCode: config.ErrStopUpload, ErrMsg: "stop overriding kvs after reaching the duplicate kv", } result.Failure = append(result.Failure, failedKv) } } func Publish(kv *model.KVDoc) { err := pubsub.Publish(&pubsub.KVChangeEvent{ Key: kv.Key, Labels: kv.Labels, Project: kv.Project, DomainID: kv.Domain, Action: pubsub.ActionPut, }) if err != nil { openlog.Warn("lost kv change event when post:" + err.Error()) } openlog.Info(fmt.Sprintf("post [%s] success", kv.ID)) } // Update update key value and add new revision func Update(ctx context.Context, kv *model.UpdateKVRequest) (*model.KVDoc, error) { oldKV, err := datasource.GetBroker().GetKVDao().Get(ctx, &model.GetKVRequest{ Domain: kv.Domain, Project: kv.Project, ID: kv.ID, }) if err != nil { return nil, err } if kv.Status != "" { oldKV.Status = kv.Status } if kv.Value != "" { oldKV.Value = kv.Value } oldKV.UpdateTime = time.Now().Unix() oldKV.UpdateRevision, err = datasource.GetBroker().GetRevisionDao().ApplyRevision(ctx, kv.Domain) if err != nil { return nil, err } err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV, datasource.WithSync(sync.FromContext(ctx))) if err != nil { return nil, err } openlog.Info( fmt.Sprintf("update %s with labels %s length [%d]", oldKV.Key, oldKV.Labels, len(kv.Value))) err = datasource.GetBroker().GetHistoryDao().AddHistory(ctx, oldKV) if err != nil { openlog.Error( fmt.Sprintf("can not add revision for [%s] [%s] in [%s],err: %s", oldKV.Key, oldKV.Labels, kv.Domain, err)) } openlog.Debug( fmt.Sprintf("add history %s with labels %s length [%d]", oldKV.Key, oldKV.Labels, len(oldKV.Value))) datasource.ClearPart(oldKV) return oldKV, nil } func FindOneAndDelete(ctx context.Context, kvID string, project, domain string) (*model.KVDoc, error) { kv, err := datasource.GetBroker().GetKVDao().FindOneAndDelete(ctx, kvID, project, domain, datasource.WithSync(sync.FromContext(ctx))) if err != nil { return nil, err } openlog.Info(fmt.Sprintf("delete success,kvID=%s", kvID)) if _, err := datasource.GetBroker().GetRevisionDao().ApplyRevision(ctx, domain); err != nil { openlog.Error(fmt.Sprintf("the kv [%s] is deleted, but increase revision failed: [%s]", kvID, err)) return nil, err } err = datasource.GetBroker().GetHistoryDao().DelayDeletionTime(ctx, []string{kvID}, project, domain) if err != nil { openlog.Error(fmt.Sprintf("add delete time to [%s] failed : [%s]", kvID, err)) } return kv, nil } func FindManyAndDelete(ctx context.Context, kvIDs []string, project, domain string) ([]*model.KVDoc, error) { var kvs []*model.KVDoc var deleted int64 var err error kvs, deleted, err = datasource.GetBroker().GetKVDao().FindManyAndDelete(ctx, kvIDs, project, domain, datasource.WithSync(sync.FromContext(ctx))) if err != nil { return nil, err } if int64(len(kvs)) != deleted { openlog.Warn(fmt.Sprintf("The count of found and the count of deleted are not equal, found %d, deleted %d", len(kvs), deleted)) } else { openlog.Info(fmt.Sprintf("deleted %d kvs, their ids are %v", deleted, kvIDs)) } if _, err := datasource.GetBroker().GetRevisionDao().ApplyRevision(ctx, domain); err != nil { openlog.Error(fmt.Sprintf("kvs [%v] are deleted, but increase revision failed: [%v]", kvIDs, err)) return nil, err } err = datasource.GetBroker().GetHistoryDao().DelayDeletionTime(ctx, kvIDs, project, domain) if err != nil { openlog.Error(fmt.Sprintf("add delete time to kvs [%s] failed : [%s]", kvIDs, err)) } return kvs, nil } func Get(ctx context.Context, req *model.GetKVRequest) (*model.KVDoc, error) { return datasource.GetBroker().GetKVDao().Get(ctx, req) } func List(ctx context.Context, project, domain string, options ...datasource.FindOption) (*model.KVResponse, error) { listSema.Acquire() defer listSema.Release() return datasource.GetBroker().GetKVDao().List(ctx, project, domain, options...) } func Exist(ctx context.Context, key, project, domain string, labels map[string]string) (bool, error) { if labels == nil { labels = map[string]string{} } labelFormat := stringutil.FormatMap(labels) exist, err := datasource.GetBroker().GetKVDao().Exist( ctx, key, project, domain, datasource.WithLabelFormat(labelFormat), datasource.WithLabels(labels), ) if err != nil { openlog.Error(err.Error()) return false, err } return exist, nil } func GetByKey(ctx context.Context, key, project, domain string, labels map[string]string) ([]*model.KVDoc, error) { if labels == nil { labels = map[string]string{} } labelFormat := stringutil.FormatMap(labels) kvs, err := datasource.GetBroker().GetKVDao().GetByKey( ctx, key, project, domain, datasource.WithLabelFormat(labelFormat), datasource.WithLabels(labels), ) if err != nil { openlog.Error(err.Error()) return nil, err } return kvs, nil }