datasource/etcd/util.go (253 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package etcd import ( "context" "fmt" "strings" pb "github.com/go-chassis/cari/discovery" "github.com/go-chassis/cari/pkg/errsvc" "github.com/go-chassis/foundation/gopool" "github.com/little-cui/etcdadpt" "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/datasource/etcd/path" "github.com/apache/servicecomb-service-center/datasource/etcd/sd" "github.com/apache/servicecomb-service-center/datasource/etcd/sync" serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" ) // schema func getSchemaSummary(ctx context.Context, domainProject string, serviceID string, schemaID string) (string, error) { key := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schemaID) resp, err := sd.SchemaSummary().Search(ctx, etcdadpt.WithStrKey(key), ) if err != nil { log.Error(fmt.Sprintf("get schema[%s/%s] summary failed", serviceID, schemaID), err) return "", err } if len(resp.Kvs) == 0 { return "", nil } return resp.Kvs[0].Value.(string), nil } func getSchemasFromDatabase(ctx context.Context, domainProject string, serviceID string) ([]*pb.Schema, error) { key := path.GenerateServiceSchemaKey(domainProject, serviceID, "") resp, err := sd.Schema().Search(ctx, etcdadpt.WithPrefix(), etcdadpt.WithStrKey(key)) if err != nil { log.Error(fmt.Sprintf("get service[%s]'s schema failed", serviceID), err) return nil, err } schemas := make([]*pb.Schema, 0, len(resp.Kvs)) for _, kv := range resp.Kvs { key := util.BytesToStringWithNoCopy(kv.Key) tmp := strings.Split(key, "/") schemaID := tmp[len(tmp)-1] schema := util.BytesToStringWithNoCopy(kv.Value.([]byte)) schemaStruct := &pb.Schema{ SchemaId: schemaID, Schema: schema, } schemas = append(schemas, schemaStruct) } return schemas, nil } func checkSchemaInfoExist(ctx context.Context, key string) (bool, error) { opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithCountOnly()) resp, errDo := sd.Schema().Search(ctx, opts...) if errDo != nil { return false, errDo } if resp.Count == 0 { return false, nil } return true, nil } func putSchema(ctx context.Context, domainProject string, serviceID string, schema *pb.Schema) ([]etcdadpt.OpOptions, error) { opts := make([]etcdadpt.OpOptions, 0) key := path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId) onPutOpt := etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema)) opts = append(opts, onPutOpt) syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, schema.Schema, sync.WithOpts(map[string]string{"key": key})) if err != nil { log.Error("fail to create update opts", err) return opts, err } opts = append(opts, syncOpts...) keySummary := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schema.SchemaId) onPutOpt = etcdadpt.OpPut(etcdadpt.WithStrKey(keySummary), etcdadpt.WithStrValue(schema.Summary)) opts = append(opts, onPutOpt) syncOpts, err = sync.GenUpdateOpts(ctx, datasource.ResourceKV, schema.Summary, sync.WithOpts(map[string]string{"key": keySummary})) if err != nil { log.Error("fail to create update opts", err) return opts, err } opts = append(opts, syncOpts...) return opts, nil } func deleteSchema(ctx context.Context, domainProject string, serviceID string, schema *pb.Schema) ([]etcdadpt.OpOptions, error) { opts := make([]etcdadpt.OpOptions, 0) key := path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId) onDelOpt := etcdadpt.OpDel(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema)) opts = append(opts, onDelOpt) syncOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, key, schema.Schema, sync.WithOpts(map[string]string{"key": key})) if err != nil { log.Error("fail to create update opts", err) return opts, err } opts = append(opts, syncOpts...) keySummary := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schema.SchemaId) onDelOpt = etcdadpt.OpDel(etcdadpt.WithStrKey(keySummary), etcdadpt.WithStrValue(schema.Summary)) opts = append(opts, onDelOpt) syncOpts, err = sync.GenDeleteOpts(ctx, datasource.ResourceKV, keySummary, schema.Summary, sync.WithOpts(map[string]string{"key": keySummary})) if err != nil { log.Error("fail to create update opts", err) return opts, err } opts = append(opts, syncOpts...) return opts, nil } func isExistSchemaID(service *pb.MicroService, schemas []*pb.Schema) bool { serviceSchemaIds := service.Schemas for _, schema := range schemas { if !util.SliceHave(serviceSchemaIds, schema.SchemaId) { log.Error(fmt.Sprintf("schema[%s/%s] does not exist schemaID", service.ServiceId, schema.SchemaId), nil) return false } } return true } func commitSchemaInfo(ctx context.Context, domainProject string, serviceID string, schema *pb.Schema) ([]etcdadpt.OpOptions, error) { if len(schema.Summary) != 0 { opts, err := putSchema(ctx, domainProject, serviceID, schema) return opts, err } opts := make([]etcdadpt.OpOptions, 0) key := path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId) opt := etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema)) opts = append(opts, opt) syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, schema.Schema, sync.WithOpts(map[string]string{"key": key})) if err != nil { log.Error("fail to create update opts", err) return opts, err } opts = append(opts, syncOpts...) return opts, nil } func getHeartbeatFunc(ctx context.Context, domainProject string, instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement) func(context.Context) { return func(_ context.Context) { hbRst := &pb.InstanceHbRst{ ServiceId: element.ServiceId, InstanceId: element.InstanceId, ErrMessage: "", } _, _, err := serviceUtil.HeartbeatUtil(ctx, domainProject, element.ServiceId, element.InstanceId) if err != nil { hbRst.ErrMessage = err.Error() log.Error(fmt.Sprintf("heartbeat set failed, %s/%s", element.ServiceId, element.InstanceId), err) } instancesHbRst <- hbRst } } func revokeInstance(ctx context.Context, domainProject string, serviceID string, instanceID string) *errsvc.Error { leaseID, err := serviceUtil.GetLeaseID(ctx, domainProject, serviceID, instanceID) if err != nil { return pb.NewError(pb.ErrUnavailableBackend, err.Error()) } if leaseID == -1 { return pb.NewError(pb.ErrInstanceNotExists, "Instance's leaseId not exist.") } err = etcdadpt.Instance().LeaseRevoke(ctx, leaseID) if err != nil { if err == etcdadpt.ErrLeaseNotFound { return pb.NewError(pb.ErrInstanceNotExists, err.Error()) } return pb.NewError(pb.ErrUnavailableBackend, err.Error()) } return nil } func statistics(ctx context.Context, withShared bool) (*pb.Statistics, error) { result := &pb.Statistics{ Services: &pb.StService{}, Instances: &pb.StInstance{}, Apps: &pb.StApp{}, } domainProject := util.ParseDomainProject(ctx) opts := serviceUtil.FromContext(ctx) // services key := path.GetServiceIndexRootKey(domainProject) + "/" svcOpts := append(opts, etcdadpt.WithStrKey(key), etcdadpt.WithPrefix()) respSvc, err := sd.ServiceIndex().Search(ctx, svcOpts...) if err != nil { return nil, err } var svcIDs []string var svcKeys []*pb.MicroServiceKey for _, keyValue := range respSvc.Kvs { key := path.GetInfoFromSvcIndexKV(keyValue.Key) svcKeys = append(svcKeys, key) svcIDs = append(svcIDs, keyValue.Value.(string)) } svcIDToNonVerKey := datasource.SetStaticServices(result, svcKeys, svcIDs, withShared) respGetInstanceCountByDomain := make(chan datasource.GetInstanceCountByDomainResponse, 1) gopool.Go(func(_ context.Context) { getInstanceCountByDomain(ctx, svcIDToNonVerKey, respGetInstanceCountByDomain) }) // instance key = path.GetInstanceRootKey(domainProject) + "/" instOpts := append(opts, etcdadpt.WithStrKey(key), etcdadpt.WithPrefix(), etcdadpt.WithKeyOnly()) respIns, err := sd.Instance().Search(ctx, instOpts...) if err != nil { return nil, err } var instIDs []string for _, keyValue := range respIns.Kvs { serviceID, _, _ := path.GetInfoFromInstKV(keyValue.Key) instIDs = append(instIDs, serviceID) } datasource.SetStaticInstances(result, svcIDToNonVerKey, instIDs) data := <-respGetInstanceCountByDomain close(respGetInstanceCountByDomain) if data.Err != nil { return nil, data.Err } result.Instances.CountByDomain = data.CountByDomain return result, nil } func getInstanceCountByDomain(ctx context.Context, svcIDToNonVerKey map[string]string, resp chan datasource.GetInstanceCountByDomainResponse) { domainID := util.ParseDomain(ctx) key := path.GetInstanceRootKey(domainID) + "/" instOpts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix(), etcdadpt.WithKeyOnly()) respIns, err := sd.Instance().Search(ctx, instOpts...) ret := datasource.GetInstanceCountByDomainResponse{ Err: err, } if err != nil { log.Error(fmt.Sprintf("get number of instances by domain[%s]", domainID), err) } else { for _, keyValue := range respIns.Kvs { serviceID, _, _ := path.GetInfoFromInstKV(keyValue.Key) _, ok := svcIDToNonVerKey[serviceID] if !ok { continue } ret.CountByDomain++ } } resp <- ret } // dep util func toDependencyFilterOptions(in *pb.GetDependenciesRequest) (opts []serviceUtil.DependencyRelationFilterOption) { if in.SameDomain { opts = append(opts, serviceUtil.WithSameDomainProject()) } if in.NoSelf { opts = append(opts, serviceUtil.WithoutSelfDependency()) } return opts }