syncer/service/replicator/resource/resource.go (293 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package resource import ( "context" "encoding/json" "errors" "fmt" "strconv" "github.com/apache/servicecomb-service-center/eventbase/datasource" "github.com/apache/servicecomb-service-center/eventbase/model" "github.com/apache/servicecomb-service-center/eventbase/service/tombstone" "github.com/apache/servicecomb-service-center/pkg/log" v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1" "github.com/go-chassis/cari/sync" ) const ( Success int32 = iota Fail Skip MicroNonExist InstNonExist NonImplement ) const ( ResultStatusSkip = "skip" ResultStatusSuccess = "success" ResultStatusFail = "fail" ResultStatusMicroNonExist = "microNonExist" ResultStatusInstNonExist = "instNonExist" ResultStatusNonImplement = "nonImplement" ) var codeDescriber = map[int32]string{ Skip: ResultStatusSkip, Success: ResultStatusSuccess, Fail: ResultStatusFail, MicroNonExist: ResultStatusMicroNonExist, InstNonExist: ResultStatusInstNonExist, NonImplement: ResultStatusNonImplement, } type NewResource func(event *v1sync.Event) Resource var ( resources = map[string]NewResource{ Account: NewAccount, Role: NewRole, Microservice: NewMicroservice, Instance: NewInstance, Heartbeat: NewHeartbeat, Config: NewConfig, KV: NewKV, } ) func RegisterResources(name string, nr NewResource) { resources[name] = nr } func New(event *v1sync.Event) (Resource, *Result) { if event == nil { return nil, &Result{ Status: Fail, Message: "event is nil", } } r, ok := resources[event.Subject] if !ok { return nil, &Result{ Status: Skip, Message: fmt.Sprintf("resource %s not exist", event.Subject), } } return r(event), nil } type operator struct { a ActionHandler } func newOperator(a ActionHandler) *operator { return &operator{ a: a, } } func (o *operator) operate(ctx context.Context, action string) *Result { var err error switch action { case sync.CreateAction: err = o.a.CreateHandle(ctx) case sync.UpdateAction: err = o.a.UpdateHandle(ctx) case sync.DeleteAction: err = o.a.DeleteHandle(ctx) default: err = fmt.Errorf("action %s is invalid", action) } r := new(Result) r.Status = Success if err != nil { r.Message = err.Error() r.Status = Fail } return r } func NewResult(status int32, message string) *Result { return &Result{ Status: status, Message: message, } } func FailResult(err error) *Result { return NewResult(Fail, err.Error()) } func SuccessResult() *Result { return NewResult(Success, "") } func SkipResult() *Result { return NewResult(Skip, "") } type Result struct { EventID string Status int32 Message string } func (r *Result) WithMessage(m string) *Result { r.Message = m return r } func (r *Result) WithEventID(id string) *Result { r.EventID = id return r } func (r *Result) Flag() string { return fmt.Sprintf("eventID: %s, status %d:%s, message: %s", r.EventID, r.Status, codeDescriber[r.Status], r.Message) } func NonImplementResult() *Result { return NewResult(NonImplement, "") } type FailHandler interface { FailHandle(context.Context, int32) (*v1sync.Event, error) CanDrop() bool } type defaultFailHandler struct { } func (d *defaultFailHandler) FailHandle(context.Context, int32) (*v1sync.Event, error) { return nil, nil } func (d *defaultFailHandler) CanDrop() bool { return true } type OperateHandler interface { LoadCurrentResource(context.Context) *Result NeedOperate(context.Context) *Result Operate(context.Context) *Result } type Resource interface { OperateHandler FailHandler } type ActionHandler interface { CreateHandle(context.Context) error UpdateHandle(context.Context) error DeleteHandle(context.Context) error } func newInputParam(input interface{}, callback func()) *inputParam { return &inputParam{ input: input, callback: callback, } } type inputParam struct { input interface{} callback func() } func newInputLoader( event *v1sync.Event, create *inputParam, update *inputParam, delete *inputParam) *inputLoader { return &inputLoader{ event: event, createInput: create.input, createCallback: create.callback, updateInput: update.input, updateCallback: update.callback, deleteInput: delete.input, deleteCallback: delete.callback, } } type inputLoader struct { event *v1sync.Event createInput interface{} createCallback func() updateInput interface{} updateCallback func() deleteInput interface{} deleteCallback func() } func (i *inputLoader) loadInputUtil(value interface{}, callback func()) error { switch data := value.(type) { case *string: *data = string(i.event.Value) default: err := json.Unmarshal(i.event.Value, value) if err != nil { return err } } if callback != nil { callback() } return nil } func (i *inputLoader) loadCreateInput() error { return i.loadInputUtil(i.createInput, i.createCallback) } func (i *inputLoader) loadUpdateInput() error { return i.loadInputUtil(i.updateInput, i.updateCallback) } func (i *inputLoader) loadDeleteInput() error { return i.loadInputUtil(i.deleteInput, i.deleteCallback) } func (i *inputLoader) loadInput() error { switch i.event.Action { case sync.CreateAction: return i.loadCreateInput() case sync.UpdateAction: return i.loadUpdateInput() case sync.DeleteAction: return i.loadDeleteInput() default: return fmt.Errorf("invalid action %s", i.event.Action) } } type tombstoneLoader interface { get(ctx context.Context, req *model.GetTombstoneRequest) (*sync.Tombstone, error) } type checker struct { curNotNil bool event *v1sync.Event updateTime func() (int64, error) resourceID string tombstoneLoader tombstoneLoader } func formatUpdateTimeSecond(src string) (int64, error) { updateTime, err := strconv.ParseInt(src, 0, 0) if err != nil { return 0, err } return secToNanoSec(updateTime), nil } func secToNanoSec(timestamp int64) int64 { return timestamp * 1000 * 1000 * 1000 } func (o *checker) needOperate(ctx context.Context) *Result { if o.curNotNil { if o.updateTime == nil { return nil } updateTime, err := o.updateTime() if err != nil { log.Error("get update time failed", err) return FailResult(err) } if updateTime >= o.event.Timestamp { return SkipResult() } return nil } switch o.event.Action { case sync.CreateAction: return nil case sync.UpdateAction: if len(o.resourceID) == 0 { return nil } ts, err := o.tombstoneLoader.get(ctx, &model.GetTombstoneRequest{ ResourceType: o.event.Subject, ResourceID: o.resourceID, }) if err != nil { if errors.Is(err, datasource.ErrTombstoneNotExists) { return nil } return FailResult(err) } if ts.Timestamp > o.event.Timestamp { return SkipResult() } return nil case sync.DeleteAction: return SkipResult() default: return FailResult(fmt.Errorf("invalid action %s", o.event.Action)) } } func (o *checker) get(ctx context.Context, req *model.GetTombstoneRequest) (*sync.Tombstone, error) { return tombstone.Get(ctx, req) }