syncer/service/replicator/resource/heartbeat.go (117 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package resource import ( "context" "encoding/json" "fmt" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1" pb "github.com/go-chassis/cari/discovery" "github.com/go-chassis/cari/pkg/errsvc" "github.com/go-chassis/cari/sync" ) const ( Heartbeat = "heartbeat" ) func NewHeartbeat(e *v1sync.Event) Resource { h := &heartbeat{ event: e, } h.manager = new(metadataManage) return h } type heartbeat struct { event *v1sync.Event input *pb.HeartbeatRequest manager metadataManager defaultFailHandler } func (h *heartbeat) loadInput() error { h.input = new(pb.HeartbeatRequest) return json.Unmarshal(h.event.Value, h.input) } func (h *heartbeat) LoadCurrentResource(_ context.Context) *Result { err := h.loadInput() if err != nil { return FailResult(err) } return nil } func (h *heartbeat) NeedOperate(context.Context) *Result { return nil } func MicroNonExistResult() *Result { return NewResult(MicroNonExist, "") } func InstNonExistResult() *Result { return NewResult(InstNonExist, "") } func (h *heartbeat) Operate(ctx context.Context) *Result { err := h.manager.SendHeartbeat(ctx, h.input) if err == nil { return SuccessResult() } log.Warn(fmt.Sprintf("send heartbeat failed, %s, %s", h.input.ServiceId, h.input.InstanceId)) _, err = h.manager.GetInstance(ctx, &pb.GetOneInstanceRequest{ ProviderServiceId: h.input.ServiceId, ProviderInstanceId: h.input.InstanceId, }) if err != nil { if errsvc.IsErrEqualCode(err, pb.ErrServiceNotExists) { return MicroNonExistResult() } if errsvc.IsErrEqualCode(err, pb.ErrInstanceNotExists) { return InstNonExistResult() } return FailResult(err) } log.Info(fmt.Sprintf("get instance return exist, %s, %s", h.input.ServiceId, h.input.InstanceId)) return SuccessResult() } func (h *heartbeat) FailHandle(ctx context.Context, code int32) (*v1sync.Event, error) { if code != InstNonExist { return nil, nil } err := h.loadInput() if err != nil { return nil, err } log.Info(fmt.Sprintf("instance %s,%s not exist, start rebuild instance", h.input.ServiceId, h.input.InstanceId)) return h.rebuildInstance(ctx) } func (h *heartbeat) rebuildInstance(ctx context.Context) (*v1sync.Event, error) { ctx = util.SetDomain(ctx, h.event.Opts[string(util.CtxDomain)]) ctx = util.SetProject(ctx, h.event.Opts[string(util.CtxProject)]) cur, err := h.manager.GetInstance(ctx, &pb.GetOneInstanceRequest{ ProviderServiceId: h.input.ServiceId, ProviderInstanceId: h.input.InstanceId, }) if err != nil { if errsvc.IsErrEqualCode(err, pb.ErrInstanceNotExists) { log.Warn(fmt.Sprintf("instance %s,%s not exist", h.input.ServiceId, h.input.InstanceId)) return nil, nil } return nil, err } value, err := json.Marshal(cur) if err != nil { return nil, err } eventID, err := v1sync.NewEventID() if err != nil { log.Error("fail to create eventID", err) return nil, err } return &v1sync.Event{ Id: eventID, Action: sync.CreateAction, Subject: Instance, Opts: h.event.Opts, Value: value, Timestamp: v1sync.Timestamp(), }, nil }