datasource/etcd/sd/servicecenter/aggregate.go (187 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package servicecenter
import (
"context"
"crypto/tls"
"fmt"
"strings"
"sync"
"github.com/apache/servicecomb-service-center/client"
"github.com/apache/servicecomb-service-center/datasource/etcd"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
"github.com/apache/servicecomb-service-center/pkg/dump"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/plugin/security/tlsconf"
"github.com/go-chassis/cari/pkg/errsvc"
"github.com/little-cui/etcdadpt"
)
var (
scClient *SCClientAggregate
clientOnce sync.Once
clientTLS *tls.Config
)
type SCClientAggregate []*client.Client
func getClientTLS() (*tls.Config, error) {
if clientTLS != nil {
return clientTLS, nil
}
var err error
clientTLS, err = tlsconf.ClientConfig()
return clientTLS, err
}
func (c *SCClientAggregate) GetScCache(ctx context.Context) (*dump.Cache, map[string]error) {
var caches *dump.Cache
errs := make(map[string]error)
for _, client := range *c {
cache, err := client.GetScCache(ctx)
if err != nil {
errs[client.Cfg.Name] = err
continue
}
if caches == nil {
caches = &dump.Cache{}
}
c.cacheAppend(client.Cfg.Name, &caches.Microservices, &cache.Microservices)
c.cacheAppend(client.Cfg.Name, &caches.Indexes, &cache.Indexes)
c.cacheAppend(client.Cfg.Name, &caches.Aliases, &cache.Aliases)
c.cacheAppend(client.Cfg.Name, &caches.Tags, &cache.Tags)
c.cacheAppend(client.Cfg.Name, &caches.DependencyRules, &cache.DependencyRules)
c.cacheAppend(client.Cfg.Name, &caches.Summaries, &cache.Summaries)
c.cacheAppend(client.Cfg.Name, &caches.Instances, &cache.Instances)
}
return caches, errs
}
func (c *SCClientAggregate) cacheAppend(name string, setter dump.Setter, getter dump.Getter) {
getter.ForEach(func(_ int, v *dump.KV) bool {
if len(v.ClusterName) == 0 || v.ClusterName == etcdadpt.DefaultClusterName {
v.ClusterName = name
}
setter.SetValue(v)
return true
})
}
func (c *SCClientAggregate) GetSchemasByServiceID(ctx context.Context, domainProject, serviceID string) (*kvstore.Response, *errsvc.Error) {
dp := strings.Split(domainProject, "/")
var response kvstore.Response
for _, client := range *c {
schemas, err := client.GetSchemasByServiceID(ctx, dp[0], dp[1], serviceID)
if err != nil && err.InternalError() {
log.Error(fmt.Sprintf("get schema by serviceID[%s/%s] failed", domainProject, serviceID), err)
continue
}
if schemas == nil {
continue
}
response.Count = int64(len(schemas))
for _, schema := range schemas {
response.Kvs = append(response.Kvs, &kvstore.KeyValue{
Key: []byte(path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId)),
Value: util.StringToBytesWithNoCopy(schema.Schema),
ModRevision: 0,
ClusterName: client.Cfg.Name,
})
}
return &response, nil
}
return &response, nil
}
func (c *SCClientAggregate) GetSchemaBySchemaID(ctx context.Context, domainProject, serviceID, schemaID string) (*kvstore.Response, *errsvc.Error) {
dp := strings.Split(domainProject, "/")
var response kvstore.Response
for _, client := range *c {
schema, err := client.GetSchemaBySchemaID(ctx, dp[0], dp[1], serviceID, schemaID)
if err != nil && err.InternalError() {
log.Error(fmt.Sprintf("get schema by serviceID[%s/%s] failed", domainProject, serviceID), err)
continue
}
if schema == nil {
continue
}
response.Count = 1
response.Kvs = append(response.Kvs, &kvstore.KeyValue{
Key: []byte(path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId)),
Value: util.StringToBytesWithNoCopy(schema.Schema),
ModRevision: 0,
ClusterName: client.Cfg.Name,
})
return &response, nil
}
return &response, nil
}
func (c *SCClientAggregate) GetInstancesByServiceID(ctx context.Context, domain, project, providerID, consumerID string) (*kvstore.Response, *errsvc.Error) {
var response kvstore.Response
for _, client := range *c {
insts, err := client.GetInstancesByServiceID(ctx, domain, project, providerID, consumerID)
if err != nil && err.InternalError() {
log.Error(fmt.Sprintf("consumer[%s] get provider[%s/%s/%s] instances failed", consumerID, domain, project, providerID), err)
continue
}
if insts == nil {
continue
}
response.Count = int64(len(insts))
for _, instance := range insts {
response.Kvs = append(response.Kvs, &kvstore.KeyValue{
Key: []byte(path.GenerateInstanceKey(domain+"/"+project, providerID, instance.InstanceId)),
Value: instance,
ModRevision: 0,
ClusterName: client.Cfg.Name,
})
}
}
return &response, nil
}
func (c *SCClientAggregate) GetInstanceByInstanceID(ctx context.Context, domain, project, providerID, instanceID, consumerID string) (*kvstore.Response, *errsvc.Error) {
var response kvstore.Response
for _, client := range *c {
instance, err := client.GetInstanceByInstanceID(ctx, domain, project, providerID, instanceID, consumerID)
if err != nil && err.InternalError() {
log.Error(fmt.Sprintf("consumer[%s] get provider[%s/%s/%s] instances failed", consumerID, domain, project, providerID), err)
continue
}
if instance == nil {
continue
}
response.Count = 1
response.Kvs = append(response.Kvs, &kvstore.KeyValue{
Key: []byte(path.GenerateInstanceKey(domain+"/"+project, providerID, instance.InstanceId)),
Value: instance,
ModRevision: 0,
ClusterName: client.Cfg.Name,
})
return &response, nil
}
return &response, nil
}
func GetOrCreateSCClient() *SCClientAggregate {
clientOnce.Do(func() {
scClient = &SCClientAggregate{}
// TODO should not use the etcd config
cfg := etcd.Configuration()
clusters, err := etcdadpt.ListCluster(context.Background())
if err != nil {
log.Fatal("GetOrCreateSCClient failed", err)
}
for name, endpoints := range clusters {
if len(name) == 0 || name == cfg.ClusterName {
continue
}
client, err := client.NewSCClient(client.Config{Name: name, Endpoints: endpoints})
if err != nil {
log.Error(fmt.Sprintf("new service center[%s]%v client failed", name, endpoints), err)
continue
}
client.Timeout = cfg.RequestTimeOut
// TLS
if strings.Contains(endpoints[0], "https") {
client.TLS, err = getClientTLS()
if err != nil {
log.Error(fmt.Sprintf("get service center[%s]%v tls config failed", name, endpoints), err)
continue
}
}
*scClient = append(*scClient, client)
log.Info(fmt.Sprintf("new service center[%s]%v client", name, endpoints))
}
})
return scClient
}