in syncer/service/event/manager.go [172:224]
func (e *ManagerImpl) resultHandle(ctx context.Context) {
for {
select {
case res, ok := <-e.result:
if !ok {
continue
}
id := res.ID
et, ok := e.cache.LoadAndDelete(id)
if !ok {
log.Warn(fmt.Sprintf("%s event not exist", id))
continue
}
event := et.(*Event).Event
r, result := resource.New(event)
if result != nil {
log.Warn(fmt.Sprintf("new resource failed, %s", result.Message))
continue
}
if res.Error != nil {
log.Error(fmt.Sprintf("result is error %s", event.Flag()), res.Error)
if r.CanDrop() {
log.Warn(fmt.Sprintf("drop event %s", event.Flag()))
continue
}
log.Info(fmt.Sprintf("resend event %s", event.Flag()))
e.Send(&Event{
Event: event,
})
continue
}
toSendEvent, err := r.FailHandle(ctx, res.Data.Code)
if err != nil {
log.Warn(fmt.Sprintf("event %s fail handle failed, %s", event.Flag(), err.Error()))
continue
}
if toSendEvent != nil {
log.Info(fmt.Sprintf("resend event %s", toSendEvent.Flag()))
e.Send(&Event{
Event: toSendEvent,
})
}
case <-ctx.Done():
log.Info("result handle worker is closed")
return
}
}
}