syncer/service/replicator/resource/kv.go (134 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package resource import ( "context" "encoding/json" "errors" "fmt" "sync" "github.com/apache/servicecomb-service-center/pkg/log" v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1" "github.com/little-cui/etcdadpt" ) const ( KV = "kv" ComparableKey = "comparable" ) const ( KVKey = "key" KVKeyNonExist = "key not exist in opts" ) var ( manager KeyManager ErrRecordNonExist = errors.New("record non exist") ) func NewKV(e *v1sync.Event) Resource { r := &kv{ event: e, manager: keyManage(), } return r } type kv struct { event *v1sync.Event key string manager KeyManager tombstoneLoader tombstoneLoader cur []byte defaultFailHandler } func (k *kv) LoadCurrentResource(ctx context.Context) *Result { key, ok := k.event.Opts[KVKey] if !ok { return NewResult(Fail, KVKeyNonExist) } k.key = key value, err := k.manager.Get(ctx, key) if err != nil { if errors.Is(err, ErrRecordNonExist) { return nil } return FailResult(err) } k.cur = value return nil } type Value struct { Timestamp int64 `json:"$timestamp"` } func (k *kv) getUpdateTime() (int64, error) { if k.cur == nil { return 0, nil } comparable, ok := k.event.Opts[ComparableKey] if !ok || comparable != "true" { return 0, nil } v := new(Value) err := json.Unmarshal(k.cur, v) if err != nil { log.Warn(fmt.Sprintf("unmarshal kv %s value failed, err %s", k.key, err.Error())) return 0, err } return v.Timestamp, nil } func (k *kv) NeedOperate(ctx context.Context) *Result { c := &checker{ curNotNil: k.cur != nil, event: k.event, updateTime: k.getUpdateTime, resourceID: k.key, } c.tombstoneLoader = c if k.tombstoneLoader != nil { c.tombstoneLoader = k.tombstoneLoader } return c.needOperate(ctx) } func (k *kv) CreateHandle(ctx context.Context) error { return k.manager.Post(ctx, k.key, k.event.Value) } func (k *kv) UpdateHandle(ctx context.Context) error { return k.manager.Put(ctx, k.key, k.event.Value) } func (k *kv) DeleteHandle(ctx context.Context) error { return k.manager.Delete(ctx, k.key) } var once sync.Once func keyManage() KeyManager { once.Do(InitManager) return manager } func (k *kv) Operate(ctx context.Context) *Result { return newOperator(k).operate(ctx, k.event.Action) } type KeyManager interface { Get(ctx context.Context, key string) ([]byte, error) Put(ctx context.Context, key string, value []byte) error Post(ctx context.Context, key string, value []byte) error Delete(ctx context.Context, key string) error } type etcdManager struct { } func InitManager() { manager = new(etcdManager) } func (e *etcdManager) Get(ctx context.Context, key string) ([]byte, error) { r, err := etcdadpt.Get(ctx, key) if err != nil { return nil, err } if r == nil { return nil, ErrRecordNonExist } return r.Value, nil } func (e etcdManager) Put(ctx context.Context, key string, value []byte) error { return etcdadpt.Put(ctx, key, string(value)) } func (e etcdManager) Post(ctx context.Context, key string, value []byte) error { return etcdadpt.Put(ctx, key, string(value)) } func (e etcdManager) Delete(ctx context.Context, key string) error { _, err := etcdadpt.Delete(ctx, key) return err }