datasource/etcd/sd/servicecenter/syncer.go (200 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package servicecenter
import (
"context"
"fmt"
"sync"
"time"
pb "github.com/go-chassis/cari/discovery"
"github.com/apache/servicecomb-service-center/datasource/etcd"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
"github.com/apache/servicecomb-service-center/datasource/etcd/state"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
"github.com/apache/servicecomb-service-center/pkg/dump"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/alarm"
"github.com/go-chassis/foundation/gopool"
)
var (
syncer *Syncer
syncerOnce sync.Once
)
type Syncer struct {
Client *SCClientAggregate
cachers map[kvstore.Type]*Cacher
}
func (c *Syncer) Initialize() {
c.cachers = make(map[kvstore.Type]*Cacher)
c.Client = GetOrCreateSCClient()
}
func (c *Syncer) Sync(ctx context.Context) {
cache, errs := c.Client.GetScCache(ctx)
if len(errs) > 0 {
err := fmt.Errorf("%v", errs)
log.Error("Sync catches errors", err)
err = alarm.Raise(alarm.IDBackendConnectionRefuse,
alarm.AdditionalContext(err.Error()))
if err != nil {
log.Error("", err)
}
if cache == nil {
return
}
}
err := alarm.Clear(alarm.IDBackendConnectionRefuse)
if err != nil {
log.Error("", err)
}
// microservice
serviceCacher, ok := c.cachers[sd.TypeService]
if ok {
c.check(serviceCacher, &cache.Microservices, errs)
}
indexCacher, ok := c.cachers[sd.TypeServiceIndex]
if ok {
c.checkWithConflictHandleFunc(indexCacher, &cache.Indexes, errs, c.logConflictFunc)
}
aliasCacher, ok := c.cachers[sd.TypeServiceAlias]
if ok {
c.checkWithConflictHandleFunc(aliasCacher, &cache.Aliases, errs, c.logConflictFunc)
}
// microservice meta
tagCacher, ok := c.cachers[sd.TypeServiceTag]
if ok {
c.check(tagCacher, &cache.Tags, errs)
}
depRuleCacher, ok := c.cachers[sd.TypeDependencyRule]
if ok {
c.check(depRuleCacher, &cache.DependencyRules, errs)
}
schemaSummaryCacher, ok := c.cachers[sd.TypeSchemaSummary]
if ok {
c.check(schemaSummaryCacher, &cache.Summaries, errs)
}
// instance
instCacher, ok := c.cachers[sd.TypeInstance]
if ok {
c.check(instCacher, &cache.Instances, errs)
}
}
func (c *Syncer) check(local *Cacher, remote dump.Getter, skipClusters map[string]error) {
c.checkWithConflictHandleFunc(local, remote, skipClusters, c.skipHandleFunc)
}
func (c *Syncer) checkWithConflictHandleFunc(local *Cacher, remote dump.Getter, skipClusters map[string]error,
conflictHandleFunc func(origin *dump.KV, conflict dump.Getter, index int)) {
exists := make(map[string]*dump.KV)
remote.ForEach(func(i int, v *dump.KV) bool {
// because the result of the remote return may contain the same data as
// the local cache of the current SC. So we need to ignore it and
// prevent the aggregation result from increasing.
if v.ClusterName == state.Configuration().ClusterName {
return true
}
if kv, ok := exists[v.Key]; ok {
conflictHandleFunc(kv, remote, i)
return true
}
exists[v.Key] = v
old := local.Cache().Get(v.Key)
newKv := &kvstore.KeyValue{
Key: util.StringToBytesWithNoCopy(v.Key),
Value: v.Value,
ModRevision: v.Rev,
ClusterName: v.ClusterName,
}
switch {
case old == nil:
newKv.Version = 1
newKv.CreateRevision = v.Rev
local.Notify(pb.EVT_CREATE, v.Key, newKv)
case old.ModRevision != v.Rev:
// if connect to some cluster failed, then skip to notify changes
// of these clusters to prevent publish the wrong changes events of kvs.
if err, ok := skipClusters[old.ClusterName]; ok {
log.Error(fmt.Sprintf("cluster[%s] temporarily unavailable, skip cluster[%s] event %s %s",
old.ClusterName, v.ClusterName, pb.EVT_UPDATE, v.Key), err)
break
}
newKv.Version = 1 + old.Version
newKv.CreateRevision = old.CreateRevision
local.Notify(pb.EVT_UPDATE, v.Key, newKv)
}
return true
})
var deletes []*kvstore.KeyValue
local.Cache().ForEach(func(key string, v *kvstore.KeyValue) (next bool) {
var exist bool
remote.ForEach(func(_ int, v *dump.KV) bool {
if v.ClusterName == state.Configuration().ClusterName {
return true
}
exist = v.Key == key
return !exist
})
if !exist {
if err, ok := skipClusters[v.ClusterName]; ok {
log.Error(fmt.Sprintf("cluster[%s] temporarily unavailable, skip event %s %s",
v.ClusterName, pb.EVT_DELETE, v.Key), err)
return true
}
deletes = append(deletes, v)
}
return true
})
for _, v := range deletes {
local.Notify(pb.EVT_DELETE, util.BytesToStringWithNoCopy(v.Key), v)
}
}
func (c *Syncer) skipHandleFunc(origin *dump.KV, conflict dump.Getter, index int) {
}
func (c *Syncer) logConflictFunc(origin *dump.KV, conflict dump.Getter, index int) {
switch conflict.(type) {
case *dump.MicroserviceIndexSlice:
slice := conflict.(*dump.MicroserviceIndexSlice)
keyValue := (*slice)[index]
if serviceID := origin.Value.(string); keyValue.Value != serviceID {
key := path.GetInfoFromSvcIndexKV(util.StringToBytesWithNoCopy(keyValue.Key))
log.Warn(fmt.Sprintf("conflict! can not merge microservice index[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
keyValue.ClusterName, keyValue.Value, key.Environment, key.AppId, key.ServiceName, key.Version,
serviceID, origin.ClusterName))
}
case *dump.MicroserviceAliasSlice:
slice := conflict.(*dump.MicroserviceAliasSlice)
keyValue := (*slice)[index]
if serviceID := origin.Value.(string); keyValue.Value != serviceID {
key := path.GetInfoFromSvcAliasKV(util.StringToBytesWithNoCopy(keyValue.Key))
log.Warn(fmt.Sprintf("conflict! can not merge microservice alias[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
keyValue.ClusterName, keyValue.Value, key.Environment, key.AppId, key.ServiceName, key.Version,
serviceID, origin.ClusterName))
}
}
}
func (c *Syncer) loop(ctx context.Context) {
cfg := etcd.Configuration()
select {
case <-ctx.Done():
case <-time.After(minWaitInterval):
c.Sync(ctx)
d := cfg.AutoSyncInterval
if d == 0 {
return
}
loop:
for {
select {
case <-ctx.Done():
break loop
case <-time.After(d):
// TODO support watching sc
c.Sync(ctx)
}
}
}
log.Debug("service center clusters syncer is stopped")
}
// unsafe
func (c *Syncer) AddCacher(t kvstore.Type, cacher *Cacher) {
c.cachers[t] = cacher
}
func (c *Syncer) Run() {
c.Initialize()
gopool.Go(c.loop)
}
func GetOrCreateSyncer() *Syncer {
syncerOnce.Do(func() {
syncer = &Syncer{}
syncer.Run()
})
return syncer
}