syncer/service/replicator/replicator.go (181 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package replicator import ( "context" "fmt" "github.com/go-chassis/foundation/gopool" "github.com/go-chassis/go-chassis/v2/server/restful" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "github.com/apache/servicecomb-service-center/client" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/rpc" "github.com/apache/servicecomb-service-center/pkg/util" "github.com/apache/servicecomb-service-center/server/plugin/security/cipher" v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1" syncerclient "github.com/apache/servicecomb-service-center/syncer/client" "github.com/apache/servicecomb-service-center/syncer/config" "github.com/apache/servicecomb-service-center/syncer/service/replicator/resource" ) const ( schema = "grpc" serviceName = "syncer" ) const ( reservedSize = 512 * 1024 maxSize = 10*1024*1024 - reservedSize ) var ( manager = NewManager(make(map[string]struct{}, 1000)) ) var ( conn *grpc.ClientConn peerToken = "" ) func Work() error { err := InitSyncClient() if err != nil { return err } gopool.Go(func(ctx context.Context) { <-ctx.Done() Close() }) resource.InitManager() return err } func InitSyncClient() error { peer := config.GetConfig().Sync.Peers[0] log.Info(fmt.Sprintf("peer is %v", peer)) var err error conn, err = rpc.GetRoundRobinLbConn(&rpc.Config{ Addrs: peer.Endpoints, Scheme: schema, ServiceName: serviceName, TLSConfig: syncerclient.RPClientConfig(), }) if err != nil { log.Error("get rpc client failed", err) return err } if !config.GetConfig().Sync.RbacEnabled { return nil } peerToken, err = cipher.Decrypt(peer.Token) if err != nil { log.Error("decrypt peer token failed, use original content", err) peerToken = peer.Token } return nil } func Close() { if conn == nil { return } err := conn.Close() if err != nil { log.Error("close conn failed", err) } } func Manager() Replicator { return manager } // Replicator define replicator manager, receive events from event manager // and send events to remote syncer type Replicator interface { Replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error) Persist(ctx context.Context, el *v1sync.EventList) []*resource.Result } func NewManager(cache map[string]struct{}) Replicator { return &replicatorManager{ cache: cache, } } type replicatorManager struct { cache map[string]struct{} } func (r *replicatorManager) Replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error) { return r.replicate(ctx, el) } func pageEvents(source *v1sync.EventList, max int) []*v1sync.EventList { els := make([]*v1sync.EventList, 0, 5) size := 0 el := &v1sync.EventList{ Events: make([]*v1sync.Event, 0, 20), } for _, event := range source.Events { lv := len(event.Value) if size+lv < max { el.Events = append(el.Events, event) size += lv continue } log.Info(fmt.Sprintf("size is %d", size)) els = append(els, el) el = &v1sync.EventList{ Events: make([]*v1sync.Event, 0, 20), } el.Events = append(el.Events, event) size = lv } log.Info(fmt.Sprintf("size is %d", size)) els = append(els, el) return els } func (r *replicatorManager) replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error) { log.Info(fmt.Sprintf("start replicate events %d", len(el.Events))) set := client.NewSet(conn) els := pageEvents(el, maxSize) result := &v1sync.Results{ Results: make(map[string]*v1sync.Result, len(el.Events)), } log.Info(fmt.Sprintf("page count %d to sync", len(els))) if config.GetConfig().Sync.RbacEnabled { ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{ restful.HeaderAuth: "Bearer " + peerToken, })) } for _, in := range els { res, err := set.EventServiceClient.Sync(ctx, in) if err != nil { return nil, err } log.Info(fmt.Sprintf("replicate events success, count is %d", len(in.Events))) for k, v := range res.Results { log.Info(fmt.Sprintf("replicate event %s, %v", k, v)) result.Results[k] = v } } log.Info(fmt.Sprintf("replicate events success %d", len(result.Results))) return result, nil } func (r *replicatorManager) Persist(ctx context.Context, el *v1sync.EventList) []*resource.Result { if el == nil || len(el.Events) == 0 { return []*resource.Result{} } results := make([]*resource.Result, 0, len(el.Events)) for _, event := range el.Events { log.Info(fmt.Sprintf("start handle event %s", event.Flag())) r, result := resource.New(event) if result != nil { results = append(results, result.WithEventID(event.Id)) continue } ctx = util.SetDomain(ctx, event.Opts[string(util.CtxDomain)]) ctx = util.SetProject(ctx, event.Opts[string(util.CtxProject)]) result = r.LoadCurrentResource(ctx) if result != nil { results = append(results, result.WithEventID(event.Id)) continue } result = r.NeedOperate(ctx) if result != nil { results = append(results, result.WithEventID(event.Id)) continue } result = r.Operate(ctx) results = append(results, result.WithEventID(event.Id)) log.Info(fmt.Sprintf("operate resource, event: %s, result: %s", event.Flag(), result.Flag())) } for _, result := range results { log.Info(fmt.Sprintf("handle event result %s", result.Flag())) } return results }