datasource/etcd/sync.go (368 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package etcd import ( "context" "encoding/json" "errors" "fmt" "strings" "github.com/go-chassis/cari/discovery" crbac "github.com/go-chassis/cari/rbac" "github.com/little-cui/etcdadpt" "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/datasource/etcd/path" esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync" "github.com/apache/servicecomb-service-center/pkg/log" putil "github.com/apache/servicecomb-service-center/pkg/util" "github.com/apache/servicecomb-service-center/server/config" ) const ( SyncAllKey = "/cse-sr/sync-all" SyncAllLockKey = "/cse-sr/sync-all-lock" // one minutes defaultLockTime = 60 ) var ( ErrWithoutDomainProject = errors.New("key without domain and project") ) type SyncManager struct { } // SyncAll will list all services,accounts,roles,schemas,tags,deps and use tasks to store func (s *SyncManager) SyncAll(ctx context.Context) error { enable := config.GetBool("sync.enableOnStart", false) if !enable { return nil } ctx = putil.SetContext(ctx, putil.CtxEnableSync, "1") exist, err := etcdadpt.Exist(ctx, SyncAllKey) if err != nil { return err } if exist { log.Info(fmt.Sprintf("%s key already exists, do not need to do tasks", SyncAllKey)) return datasource.ErrSyncAllKeyExists } lock, err := etcdadpt.TryLock(SyncAllLockKey, defaultLockTime) if err != nil || lock == nil { log.Info(fmt.Sprintf("%s lock not acquired", SyncAllLockKey)) return nil } defer func(lock *etcdadpt.DLock) { err := lock.Unlock() if err != nil { log.Error(fmt.Sprintf("fail to unlock the %s key", SyncAllLockKey), err) } }(lock) err = syncAllRoles(ctx) if err != nil { return err } err = syncAllAccounts(ctx) if err != nil { return err } err = syncAllServices(ctx) if err != nil { return err } err = syncAllTags(ctx) if err != nil { return err } err = syncAllSchemas(ctx) if err != nil { return err } err = syncAllDependencies(ctx) if err != nil { return err } return etcdadpt.Put(ctx, SyncAllKey, "1") } func syncAllAccounts(ctx context.Context) error { kvs, _, err := etcdadpt.List(ctx, path.GenerateRBACAccountKey("")) if err != nil { return err } syncOpts := make([]etcdadpt.OpOptions, 0) putil.SetDomain(ctx, "") putil.SetProject(ctx, "") for _, v := range kvs { a := &crbac.Account{} err = json.Unmarshal(v.Value, a) if err != nil { log.Error("fail to unmarshal account ", err) return err } opt, err := esync.GenCreateOpts(ctx, datasource.ResourceAccount, a) if err != nil { log.Error("fail to create sync opts", err) return err } syncOpts = append(syncOpts, opt...) } err = etcdadpt.Txn(ctx, syncOpts) if err != nil { log.Error("fail to account tasks", err) } return err } func syncAllRoles(ctx context.Context) error { kvs, _, err := etcdadpt.List(ctx, path.GenerateRBACRoleKey("")) if err != nil { return err } syncOpts := make([]etcdadpt.OpOptions, 0) putil.SetDomain(ctx, "") putil.SetProject(ctx, "") for _, v := range kvs { r := &crbac.Role{} err = json.Unmarshal(v.Value, r) if err != nil { log.Error("fail to unmarshal role", err) return err } opt, err := esync.GenCreateOpts(ctx, datasource.ResourceRole, r) if err != nil { log.Error("fail to create sync opts", err) return err } syncOpts = append(syncOpts, opt...) } err = etcdadpt.Txn(ctx, syncOpts) if err != nil { log.Error("fail to role tasks", err) } return err } // syncAllTags func use kv resource task to store tags func syncAllTags(ctx context.Context) error { kvs, _, err := etcdadpt.List(ctx, path.GetServiceTagRootKey("")) if err != nil { return err } syncOpts := make([]etcdadpt.OpOptions, 0) for _, kv := range kvs { domain, project, err := getDomainProject(string(kv.Key), path.GetServiceTagRootKey("")) if err != nil { log.Error("fail to get domain and project", err) return err } putil.SetDomain(ctx, domain) putil.SetProject(ctx, project) opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value, esync.WithOpts(map[string]string{"key": string(kv.Key)})) if err != nil { log.Error("fail to create tag opts", err) return err } syncOpts = append(syncOpts, opts...) } err = etcdadpt.Txn(ctx, syncOpts) if err != nil { log.Error("fail to create tag tasks", err) } return err } func syncAllServices(ctx context.Context) error { kvs, _, err := etcdadpt.List(ctx, path.GetServiceRootKey("")) if err != nil { return err } syncOpts := make([]etcdadpt.OpOptions, 0) for _, kv := range kvs { service := &discovery.MicroService{} err := json.Unmarshal(kv.Value, service) if err != nil { log.Error("fail to unmarshal service", err) return err } domain, project, err := getDomainProject(string(kv.Key), path.GetServiceRootKey("")) if err != nil { log.Error("fail to get domain and project", err) return err } putil.SetDomain(ctx, domain) putil.SetProject(ctx, project) request := &discovery.CreateServiceRequest{ Service: service, } opts, err := esync.GenCreateOpts(ctx, datasource.ResourceService, request) if err != nil { log.Error("fail to create service task", err) return err } syncOpts = append(syncOpts, opts...) } err = etcdadpt.Txn(ctx, syncOpts) if err != nil { log.Error("fail to create service tasks", err) } return err } // syncAllSchemas func use kv resource task to store schemas func syncAllSchemas(ctx context.Context) error { putil.SetDomain(ctx, "") putil.SetProject(ctx, "") err := syncAllServiceSchemas(ctx) if err != nil { return err } err = syncAllServiceSchemaRefs(ctx) if err != nil { return err } err = syncAllServiceSchemaContents(ctx) if err != nil { return err } return syncAllServiceSchemaSummaries(ctx) } func syncAllServiceSchemas(ctx context.Context) error { kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaRootKey("")) if err != nil { return err } syncOpts := make([]etcdadpt.OpOptions, 0) for _, kv := range kvs { domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaRootKey("")) if err != nil { log.Error("fail to get domain and project", err) return err } putil.SetDomain(ctx, domain) putil.SetProject(ctx, project) opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value, esync.WithOpts(map[string]string{"key": string(kv.Key)})) if err != nil { log.Error("fail to create schema opts", err) return err } syncOpts = append(syncOpts, opts...) } err = etcdadpt.Txn(ctx, syncOpts) if err != nil { log.Error("fail to create schema tasks", err) } return err } func syncAllServiceSchemaRefs(ctx context.Context) error { kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaRefRootKey("")) if err != nil { return err } syncOpts := make([]etcdadpt.OpOptions, 0) for _, kv := range kvs { domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaRefRootKey("")) if err != nil { log.Error("fail to get domain and project", err) return err } putil.SetDomain(ctx, domain) putil.SetProject(ctx, project) opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value, esync.WithOpts(map[string]string{"key": string(kv.Key)})) if err != nil { log.Error("fail to create schema ref opts", err) return err } syncOpts = append(syncOpts, opts...) } err = etcdadpt.Txn(ctx, syncOpts) if err != nil { log.Error("fail to create schema ref tasks", err) } return err } func syncAllServiceSchemaContents(ctx context.Context) error { kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaContentRootKey("")) if err != nil { return err } syncOpts := make([]etcdadpt.OpOptions, 0) for _, kv := range kvs { domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaContentRootKey("")) if err != nil { log.Error("fail to get domain and project", err) return err } putil.SetDomain(ctx, domain) putil.SetProject(ctx, project) opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value, esync.WithOpts(map[string]string{"key": string(kv.Key)})) if err != nil { log.Error("fail to create schema content opts", err) return err } syncOpts = append(syncOpts, opts...) } err = etcdadpt.Txn(ctx, syncOpts) if err != nil { log.Error("fail to create schema content tasks", err) } return err } func syncAllServiceSchemaSummaries(ctx context.Context) error { kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaSummaryRootKey("")) if err != nil { return err } syncOpts := make([]etcdadpt.OpOptions, 0) for _, kv := range kvs { domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaSummaryRootKey("")) if err != nil { log.Error("fail to get domain and project", err) return err } putil.SetDomain(ctx, domain) putil.SetProject(ctx, project) opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value, esync.WithOpts(map[string]string{"key": string(kv.Key)})) if err != nil { log.Error("fail to create schema summary opts", err) return err } syncOpts = append(syncOpts, opts...) } err = etcdadpt.Txn(ctx, syncOpts) if err != nil { log.Error("fail to create schema summary tasks", err) } return err } func syncAllDependencies(ctx context.Context) error { kvs, _, err := etcdadpt.List(ctx, path.GetServiceDependencyQueueRootKey("")) if err != nil { return err } syncOpts := make([]etcdadpt.OpOptions, 0) for _, kv := range kvs { domain, project, err := getDomainProject(string(kv.Key), path.GetServiceDependencyQueueRootKey("")) if err != nil { log.Error("fail to get domain and project", err) return err } putil.SetDomain(ctx, domain) putil.SetProject(ctx, project) opts, err := esync.GenUpdateOpts(ctx, datasource.ResourceKV, kv.Value, esync.WithOpts(map[string]string{"key": string(kv.Key)})) if err != nil { log.Error("fail to create dep opts", err) return err } syncOpts = append(syncOpts, opts...) } err = etcdadpt.Txn(ctx, syncOpts) if err != nil { log.Error("fail to create dep tasks", err) } return err } func getDomainProject(key string, prefixKey string) (domain string, project string, err error) { splitKey := strings.Split(key, prefixKey) if len(splitKey) != 2 { return "", "", ErrWithoutDomainProject } suffixKey := splitKey[len(splitKey)-1] splitStr := strings.Split(suffixKey, "/") if len(splitStr) < 2 { return "", "", ErrWithoutDomainProject } domain = splitStr[0] project = splitStr[1] return }