syncer/service/event/manager.go (259 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package event import ( "context" "fmt" "sort" "sync" "time" "github.com/apache/servicecomb-service-center/pkg/log" v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1" "github.com/apache/servicecomb-service-center/syncer/metrics" "github.com/apache/servicecomb-service-center/syncer/service/replicator" "github.com/apache/servicecomb-service-center/syncer/service/replicator/resource" "github.com/go-chassis/foundation/gopool" ) const ( DefaultInternal = 500 * time.Millisecond eventChanSize = 1000 batchEventChanSize = 100 resChanSize = 1000 eventSliceSize = 100 ) var m Manager type Event struct { *v1sync.Event CanNotAbandon bool Result chan<- *Result } type Result struct { ID string Data *v1sync.Result Error error } func Work() { m = NewManager() m.HandleEvent() m.HandleResult() } func GetManager() Manager { return m } type ManagerOption func(*managerOptions) type managerOptions struct { internal time.Duration replicator replicator.Replicator } func ManagerInternal(i time.Duration) ManagerOption { return func(options *managerOptions) { options.internal = i } } func toManagerOptions(os ...ManagerOption) *managerOptions { mo := new(managerOptions) mo.internal = DefaultInternal mo.replicator = replicator.Manager() for _, o := range os { o(mo) } return mo } func Replicator(r replicator.Replicator) ManagerOption { return func(options *managerOptions) { options.replicator = r } } func NewManager(os ...ManagerOption) Manager { mo := toManagerOptions(os...) em := &ManagerImpl{ events: make(chan *Event, eventChanSize), batchEvents: make(chan []*Event, batchEventChanSize), result: make(chan *Result, resChanSize), internal: mo.internal, Replicator: mo.replicator, } return em } // Sender send events type Sender interface { Send(et *Event) } // Manager manage events, including send events, handle events and handle result type Manager interface { Sender HandleEvent() HandleResult() } type ManagerImpl struct { events chan *Event batchEvents chan []*Event internal time.Duration ticker *time.Ticker cache sync.Map result chan *Result Replicator replicator.Replicator } func (e *ManagerImpl) Send(et *Event) { if et.Result == nil { et.Result = e.result e.cache.Store(et.Id, et) } if e.checkThreshold(et) { return } e.events <- et } func (e *ManagerImpl) checkThreshold(et *Event) bool { metrics.PendingEventSet(int64(len(e.events))) if len(e.events) < cap(e.events) { return false } log.Warn(fmt.Sprintf("events reaches the limit %d", cap(e.events))) if et.CanNotAbandon { return false } log.Warn(fmt.Sprintf("drop event %s", et.Flag())) metrics.AbandonEventAdd() return true } func (e *ManagerImpl) HandleResult() { gopool.Go(func(ctx context.Context) { e.resultHandle(ctx) }) } func (e *ManagerImpl) resultHandle(ctx context.Context) { for { select { case res, ok := <-e.result: if !ok { continue } id := res.ID et, ok := e.cache.LoadAndDelete(id) if !ok { log.Warn(fmt.Sprintf("%s event not exist", id)) continue } event := et.(*Event).Event r, result := resource.New(event) if result != nil { log.Warn(fmt.Sprintf("new resource failed, %s", result.Message)) continue } if res.Error != nil { log.Error(fmt.Sprintf("result is error %s", event.Flag()), res.Error) if r.CanDrop() { log.Warn(fmt.Sprintf("drop event %s", event.Flag())) continue } log.Info(fmt.Sprintf("resend event %s", event.Flag())) e.Send(&Event{ Event: event, }) continue } toSendEvent, err := r.FailHandle(ctx, res.Data.Code) if err != nil { log.Warn(fmt.Sprintf("event %s fail handle failed, %s", event.Flag(), err.Error())) continue } if toSendEvent != nil { log.Info(fmt.Sprintf("resend event %s", toSendEvent.Flag())) e.Send(&Event{ Event: toSendEvent, }) } case <-ctx.Done(): log.Info("result handle worker is closed") return } } } func (e *ManagerImpl) Close() { e.ticker.Stop() close(e.result) } type syncEvents []*Event func (s syncEvents) Len() int { return len(s) } func (s syncEvents) Less(i, j int) bool { return s[i].Timestamp < s[j].Timestamp } func (s syncEvents) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (e *ManagerImpl) HandleEvent() { gopool.Go(func(ctx context.Context) { e.handleBatchEvents(ctx) }) gopool.Go(func(ctx context.Context) { e.readAndPackEvents(ctx) }) } func (e *ManagerImpl) readAndPackEvents(ctx context.Context) { events := make([]*Event, 0, eventSliceSize) e.ticker = time.NewTicker(e.internal) for { select { case <-e.ticker.C: if len(events) == 0 { continue } send := events[:] events = make([]*Event, 0, eventSliceSize) e.batchEvents <- send case event, ok := <-e.events: if !ok { return } events = append(events, event) if len(events) > 50 { send := events[:] events = make([]*Event, 0, eventSliceSize) e.batchEvents <- send } case <-ctx.Done(): e.Close() return } } } func (e *ManagerImpl) handleBatchEvents(ctx context.Context) { for { select { case send, ok := <-e.batchEvents: if !ok { return } e.handle(ctx, send) case <-ctx.Done(): e.Close() return } } } func (e *ManagerImpl) handle(ctx context.Context, es syncEvents) { sort.Sort(es) sendEvents := make([]*v1sync.Event, 0, len(es)) for _, event := range es { sendEvents = append(sendEvents, event.Event) } result, err := e.Replicator.Replicate(ctx, &v1sync.EventList{ Events: sendEvents, }) if err != nil { log.Error("replicate failed", err) result = &v1sync.Results{ Results: make(map[string]*v1sync.Result), } } for _, e := range es { e.Result <- &Result{ ID: e.Id, Data: result.Results[e.Id], Error: err, } } } // Send sends event to replicator func Send(e *Event) { log.Info(fmt.Sprintf("send event %s", e.Subject)) m.Send(e) }