server/service/disco/metadata.go (213 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package disco
import (
"context"
"fmt"
"strconv"
"time"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core"
quotasvc "github.com/apache/servicecomb-service-center/server/service/quota"
"github.com/apache/servicecomb-service-center/server/service/validator"
pb "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/foundation/gopool"
)
func RegisterService(ctx context.Context, request *pb.CreateServiceRequest) (*pb.CreateServiceResponse, error) {
//create service
resp, err := registerService(ctx, request)
if err != nil {
return nil, err
}
if !hasServiceDetails(request) {
return resp, nil
}
//create tag,rule,instances
return registerServiceDetails(ctx, request, resp.ServiceId)
}
func registerService(ctx context.Context, request *pb.CreateServiceRequest) (*pb.CreateServiceResponse, error) {
remoteIP := util.GetIPFromContext(ctx)
if request == nil || request.Service == nil {
log.Error(fmt.Sprintf("create micro-service failed: request body is empty, operator: %s", remoteIP), nil)
return nil, pb.NewError(pb.ErrInvalidParams, "Request body is empty")
}
service := request.Service
serviceFlag := util.StringJoin([]string{
service.Environment, service.AppId, service.ServiceName, service.Version}, "/")
datasource.SetServiceDefaultValue(service)
if err := validator.ValidateCreateServiceRequest(request); err != nil {
log.Error(fmt.Sprintf("create micro-service[%s] failed, operator: %s", serviceFlag, remoteIP), err)
return nil, pb.NewError(pb.ErrInvalidParams, err.Error())
}
if quotaErr := checkServiceQuota(ctx); quotaErr != nil {
log.Error(fmt.Sprintf("create micro-service[%s] failed, operator: %s", serviceFlag, remoteIP), quotaErr)
return nil, quotaErr
}
assignDefaultValue(service)
return datasource.GetMetadataManager().RegisterService(ctx, request)
}
func assignDefaultValue(service *pb.MicroService) {
formatTenBase := 10
service.Timestamp = strconv.FormatInt(time.Now().Unix(), formatTenBase)
service.ModTimestamp = service.Timestamp
}
func registerServiceDetails(ctx context.Context, in *pb.CreateServiceRequest, serviceID string) (*pb.CreateServiceResponse, error) {
var chanLen = 0
errorsCh := make(chan error, 10)
//create tags
if in.Tags != nil && len(in.Tags) != 0 {
chanLen++
gopool.Go(func(_ context.Context) {
req := &pb.AddServiceTagsRequest{
ServiceId: serviceID,
Tags: in.Tags,
}
err := PutManyTags(ctx, req)
errorsCh <- err
})
}
// create instance
if in.Instances != nil && len(in.Instances) != 0 {
chanLen++
gopool.Go(func(_ context.Context) {
for _, ins := range in.Instances {
req := &pb.RegisterInstanceRequest{
Instance: ins,
}
req.Instance.ServiceId = serviceID
_, err := RegisterInstance(ctx, req)
errorsCh <- err
}
})
}
// handle result
var errMessages []string
for err := range errorsCh {
chanLen--
if err != nil {
errMessages = append(errMessages, err.Error())
}
if 0 == chanLen {
close(errorsCh)
}
}
if len(errMessages) != 0 {
return nil, pb.NewError(pb.ErrInvalidParams, fmt.Sprintf("errMessages: %v", errMessages))
}
log.Info(fmt.Sprintf("createServiceEx, serviceID: %s, operator: %s", serviceID, util.GetIPFromContext(ctx)))
return &pb.CreateServiceResponse{
ServiceId: serviceID,
}, nil
}
func hasServiceDetails(in *pb.CreateServiceRequest) bool {
if len(in.Rules) == 0 && len(in.Tags) == 0 && len(in.Instances) == 0 {
return false
}
return true
}
func checkServiceQuota(ctx context.Context) error {
if core.IsSCInstance(ctx) {
log.Debug("skip quota check")
return nil
}
return quotasvc.ApplyService(ctx, 1)
}
func UnregisterService(ctx context.Context, request *pb.DeleteServiceRequest) error {
remoteIP := util.GetIPFromContext(ctx)
if err := validator.ValidateDeleteServiceRequest(request); err != nil {
log.Error(fmt.Sprintf("delete micro-service[%s] failed, operator: %s", request.ServiceId, remoteIP), err)
return pb.NewError(pb.ErrInvalidParams, err.Error())
}
return datasource.GetMetadataManager().UnregisterService(ctx, request)
}
func GetService(ctx context.Context, in *pb.GetServiceRequest) (*pb.MicroService, error) {
remoteIP := util.GetIPFromContext(ctx)
if err := validator.ValidateGetServiceRequest(in); err != nil {
log.Error(fmt.Sprintf("get micro-service[%s] failed, operator: %s", in.ServiceId, remoteIP), err)
return nil, pb.NewError(pb.ErrInvalidParams, err.Error())
}
return datasource.GetMetadataManager().GetService(ctx, in)
}
func ListService(ctx context.Context, in *pb.GetServicesRequest) (*pb.GetServicesResponse, error) {
resp, err := datasource.GetMetadataManager().ListService(ctx, in)
if err == nil && len(resp.Services) > 0 {
resp.Services = datasource.RemoveGlobalServices(in.WithShared, util.ParseDomainProject(ctx), resp.Services)
}
return resp, err
}
func FindService(ctx context.Context, in *pb.MicroServiceKey) (*pb.GetServicesResponse, error) {
return datasource.GetMetadataManager().FindService(ctx, in)
}
func UnregisterManyService(ctx context.Context, request *pb.DelServicesRequest) (*pb.DelServicesResponse, error) {
remoteIP := util.GetIPFromContext(ctx)
// 合法性检查
if err := validator.ValidateUnregisterManyService(request); err != nil {
log.Error(fmt.Sprintf("delete micro-services failed, operator: %s", remoteIP), err)
return nil, pb.NewError(pb.ErrInvalidParams, err.Error())
}
existFlag := map[string]bool{}
nuoMultiCount := 0
// 批量删除服务
serviceRespChan := make(chan *pb.DelServicesRspInfo, len(request.ServiceIds))
for _, serviceID := range request.ServiceIds {
//ServiceId重复性检查
if _, ok := existFlag[serviceID]; ok {
log.Warn(fmt.Sprintf("duplicate micro-service[%s] serviceID", serviceID))
continue
}
existFlag[serviceID] = true
nuoMultiCount++
//执行删除服务操作
gopool.Go(getDeleteServiceFunc(ctx, serviceID, request.Force, serviceRespChan))
}
//获取批量删除服务的结果
count := 0
responseCode := pb.ResponseSuccess
delServiceRspInfo := make([]*pb.DelServicesRspInfo, 0, len(serviceRespChan))
for serviceRespItem := range serviceRespChan {
count++
if len(serviceRespItem.ErrMessage) != 0 {
responseCode = pb.ErrInvalidParams
}
delServiceRspInfo = append(delServiceRspInfo, serviceRespItem)
//结果收集over,关闭通道
if count == nuoMultiCount {
close(serviceRespChan)
}
}
log.Info(fmt.Sprintf("batch delete micro-services by serviceIDs[%d]: %v, result code: %d, operator: %s",
len(request.ServiceIds), request.ServiceIds, responseCode, remoteIP))
return &pb.DelServicesResponse{
Services: delServiceRspInfo,
}, nil
}
func getDeleteServiceFunc(ctx context.Context, serviceID string, force bool, serviceRespChan chan<- *pb.DelServicesRspInfo) func(context.Context) {
return func(_ context.Context) {
serviceRst := &pb.DelServicesRspInfo{
ServiceId: serviceID,
ErrMessage: "",
}
err := UnregisterService(ctx, &pb.DeleteServiceRequest{
ServiceId: serviceID,
Force: force,
})
if err != nil {
serviceRst.ErrMessage = err.Error()
}
serviceRespChan <- serviceRst
}
}
func ExistService(ctx context.Context, in *pb.GetExistenceRequest) (string, error) {
remoteIP := util.GetIPFromContext(ctx)
if err := validator.ValidateGetServiceExistenceRequest(in); err != nil {
serviceFlag := util.StringJoin([]string{in.Environment, in.AppId, in.ServiceName, in.Version}, "/")
log.Error(fmt.Sprintf("micro-service[%s] exist failed, operator: %s", serviceFlag, remoteIP), err)
return "", pb.NewError(pb.ErrInvalidParams, err.Error())
}
return datasource.GetMetadataManager().ExistService(ctx, in)
}
func PutServiceProperties(ctx context.Context, request *pb.UpdateServicePropsRequest) error {
remoteIP := util.GetIPFromContext(ctx)
if err := validator.ValidateUpdateServicePropsRequest(request); err != nil {
log.Error(fmt.Sprintf("update service[%s] properties failed, operator: %s",
request.ServiceId, remoteIP), err)
return pb.NewError(pb.ErrInvalidParams, err.Error())
}
return datasource.GetMetadataManager().PutServiceProperties(ctx, request)
}
func ServiceUsage(ctx context.Context, request *pb.GetServiceCountRequest) (int64, error) {
resp, err := datasource.GetMetadataManager().CountService(ctx, request)
if err != nil {
return 0, err
}
return resp.Count, nil
}