server/service/registry/registry.go (124 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package registry import ( "context" "fmt" "time" "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" "github.com/apache/servicecomb-service-center/server/core" discosvc "github.com/apache/servicecomb-service-center/server/service/disco" "github.com/apache/servicecomb-service-center/server/service/sync" pb "github.com/go-chassis/cari/discovery" "github.com/go-chassis/cari/pkg/errsvc" "github.com/go-chassis/foundation/gopool" ) func addDefaultContextValue(ctx context.Context) context.Context { ctx = util.WithNoCache(ctx) // set default domain/project ctx = util.SetDomainProject(ctx, datasource.RegistryDomain, datasource.RegistryProject) // register without quota check ctx = util.SetContext(ctx, core.CtxScSelf, true) // is a sync operation return sync.SetContext(ctx) } func SelfRegister(ctx context.Context) error { err := selfRegister(ctx) if err != nil { return err } // start send heart beat job autoSelfHeartBeat() return nil } func selfRegister(pCtx context.Context) error { ctx := addDefaultContextValue(pCtx) err := registerService(ctx) if err != nil { return err } // 实例信息 return registerInstance(ctx) } func registerService(ctx context.Context) error { serviceID, err := discosvc.ExistService(ctx, core.GetExistenceRequest()) if err != nil { log.Error("query service center existence failed", err) if !errsvc.IsErrEqualCode(err, pb.ErrServiceNotExists) && !errsvc.IsErrEqualCode(err, pb.ErrServiceVersionNotExists) { return err } return registerNewService(ctx) } log.Warn(fmt.Sprintf("service center service[%s] already registered", serviceID)) core.Service, err = discosvc.GetService(ctx, core.GetServiceRequest(serviceID)) if err != nil { log.Error(fmt.Sprintf("query service center service[%s] info failed", serviceID), err) return err } return nil } func registerNewService(ctx context.Context) error { respS, err := discosvc.RegisterService(ctx, core.CreateServiceRequest()) if err != nil { log.Error("register service center failed", err) return err } core.Service.ServiceId = respS.ServiceId log.Info(fmt.Sprintf("register service center service[%s]", respS.ServiceId)) return nil } func registerInstance(ctx context.Context) error { core.Instance.InstanceId = "" core.Instance.ServiceId = core.Service.ServiceId respI, err := discosvc.RegisterInstance(ctx, core.RegisterInstanceRequest()) if err != nil { log.Error("register failed", err) return err } core.Instance.InstanceId = respI.InstanceId log.Info(fmt.Sprintf("register service center instance[%s/%s], endpoints is %s", core.Service.ServiceId, respI.InstanceId, core.Instance.Endpoints)) return nil } func selfHeartBeat(pCtx context.Context) error { ctx := addDefaultContextValue(pCtx) err := discosvc.SendHeartbeat(ctx, core.HeartbeatRequest()) if err != nil { log.Error("send heartbeat failed", err) return err } log.Debug(fmt.Sprintf("update service center instance[%s/%s] heartbeat", core.Instance.ServiceId, core.Instance.InstanceId)) return nil } func autoSelfHeartBeat() { gopool.Go(func(ctx context.Context) { for { select { case <-ctx.Done(): return case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second): err := selfHeartBeat(ctx) if err == nil { continue } //服务不存在,创建服务 err = selfRegister(ctx) if err != nil { log.Error(fmt.Sprintf("retry to register[%s/%s/%s/%s] failed", core.Service.Environment, core.Service.AppId, core.Service.ServiceName, core.Service.Version), err) } } } }) } func SelfUnregister(pCtx context.Context) error { if len(core.Instance.InstanceId) == 0 { return nil } ctx := addDefaultContextValue(pCtx) err := discosvc.UnregisterInstance(ctx, core.UnregisterInstanceRequest()) if err != nil { log.Error(fmt.Sprintf("unregister service center instance[%s/%s] failed", core.Instance.ServiceId, core.Instance.InstanceId), err) return err } log.Warn(fmt.Sprintf("unregister service center instance[%s/%s]", core.Service.ServiceId, core.Instance.InstanceId)) return nil }