syncer/service/replicator/resource/instance.go (183 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package resource import ( "context" "fmt" "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1" pb "github.com/go-chassis/cari/discovery" "github.com/go-chassis/cari/pkg/errsvc" ) const ( Instance = "instance" ) func NewInstance(e *v1sync.Event) Resource { i := &instance{ event: e, } i.manager = new(metadataManage) return i } type instance struct { event *v1sync.Event createInput *pb.RegisterInstanceRequest updateInput *pb.MicroServiceInstance deleteInput *pb.UnregisterInstanceRequest serviceID string instanceID string service *pb.MicroService cur *pb.MicroServiceInstance manager metadataManager } func (i *instance) loadInput() error { i.createInput = new(pb.RegisterInstanceRequest) cre := newInputParam(i.createInput, func() { i.serviceID = i.createInput.Instance.ServiceId i.instanceID = i.createInput.Instance.InstanceId }) i.updateInput = new(pb.MicroServiceInstance) upd := newInputParam(i.updateInput, func() { i.serviceID = i.updateInput.ServiceId i.instanceID = i.updateInput.InstanceId }) i.deleteInput = new(pb.UnregisterInstanceRequest) del := newInputParam(i.deleteInput, func() { i.serviceID = i.deleteInput.ServiceId i.instanceID = i.deleteInput.InstanceId }) return newInputLoader( i.event, cre, upd, del, ).loadInput() } type metadataManage struct { } func (m *metadataManage) RegisterService(ctx context.Context, request *pb.CreateServiceRequest) (*pb.CreateServiceResponse, error) { return datasource.GetMetadataManager().RegisterService(ctx, request) } func (m *metadataManage) GetService(ctx context.Context, in *pb.GetServiceRequest) (*pb.MicroService, error) { return datasource.GetMetadataManager().GetService(ctx, in) } func (m *metadataManage) PutServiceProperties(ctx context.Context, request *pb.UpdateServicePropsRequest) error { return datasource.GetMetadataManager().PutServiceProperties(ctx, request) } func (m *metadataManage) UnregisterService(ctx context.Context, request *pb.DeleteServiceRequest) error { return datasource.GetMetadataManager().UnregisterService(ctx, request) } func (m *metadataManage) RegisterInstance(ctx context.Context, in *pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error) { return datasource.GetMetadataManager().RegisterInstance(ctx, in) } func (m *metadataManage) SendHeartbeat(ctx context.Context, in *pb.HeartbeatRequest) error { return datasource.GetMetadataManager().SendHeartbeat(ctx, in) } func (m *metadataManage) GetInstance(ctx context.Context, in *pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) { return datasource.GetMetadataManager().GetInstance(ctx, in) } func (m *metadataManage) PutInstance(ctx context.Context, in *pb.RegisterInstanceRequest) error { return datasource.GetMetadataManager().PutInstance(ctx, in) } func (m *metadataManage) UnregisterInstance(ctx context.Context, in *pb.UnregisterInstanceRequest) error { return datasource.GetMetadataManager().UnregisterInstance(ctx, in) } type metadataManager interface { serviceManager instanceManager } type instanceManager interface { RegisterInstance(ctx context.Context, in *pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error) SendHeartbeat(ctx context.Context, in *pb.HeartbeatRequest) error GetInstance(ctx context.Context, in *pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) PutInstance(ctx context.Context, in *pb.RegisterInstanceRequest) error UnregisterInstance(ctx context.Context, in *pb.UnregisterInstanceRequest) error } func (i *instance) LoadCurrentResource(ctx context.Context) *Result { err := i.loadInput() if err != nil { return FailResult(err) } serviceID := i.serviceID service, err := i.manager.GetService(ctx, &pb.GetServiceRequest{ ServiceId: serviceID, }) if err != nil { if errsvc.IsErrEqualCode(err, pb.ErrServiceNotExists) { log.Warn(fmt.Sprintf("instance service not exist, %s", i.event.Flag())) return MicroNonExistResult() } return FailResult(err) } i.service = service inst, err := i.manager.GetInstance(ctx, &pb.GetOneInstanceRequest{ ProviderServiceId: serviceID, ProviderInstanceId: i.instanceID, }) if err != nil { if errsvc.IsErrEqualCode(err, pb.ErrInstanceNotExists) { return nil } return FailResult(err) } i.cur = inst.Instance return nil } func (i *instance) NeedOperate(ctx context.Context) *Result { c := &checker{ curNotNil: i.cur != nil, event: i.event, updateTime: func() (int64, error) { return formatUpdateTimeSecond(i.cur.ModTimestamp) }, resourceID: "", } c.tombstoneLoader = c return c.needOperate(ctx) } func (i *instance) FailHandle(ctx context.Context, code int32) (*v1sync.Event, error) { if code != MicroNonExist { return nil, nil } err := i.loadInput() if err != nil { return nil, err } ctx = util.SetDomainProject(ctx, i.event.Opts[string(util.CtxDomain)], i.event.Opts[string(util.CtxProject)]) serviceID := i.serviceID _, err = i.manager.GetService(ctx, &pb.GetServiceRequest{ ServiceId: serviceID, }) if err != nil { if errsvc.IsErrEqualCode(err, pb.ErrServiceNotExists) { log.Warn(fmt.Sprintf("service not exist %s, %s", serviceID, i.event.Flag())) return nil, nil } return nil, err } return i.event, nil } func (i *instance) CanDrop() bool { return false } func (i *instance) Operate(ctx context.Context) *Result { return newOperator(i).operate(ctx, i.event.Action) } func (i *instance) CreateHandle(ctx context.Context) error { _, err := i.manager.RegisterInstance(ctx, i.createInput) return err } func (i *instance) UpdateHandle(ctx context.Context) error { return i.manager.PutInstance(ctx, &pb.RegisterInstanceRequest{ Instance: i.updateInput, }) } func (i *instance) DeleteHandle(ctx context.Context) error { return i.manager.UnregisterInstance(ctx, i.deleteInput) }