func()

in pkg/schema/cache.go [149:242]


func (sr *schemaRepo) Watcher() {
	for i := 0; i < sr.workerNum; i++ {
		go func() {
			if !sr.closer.AddReceiver() {
				return
			}
			defer func() {
				sr.closer.ReceiverDone()
				if err := recover(); err != nil {
					sr.l.Warn().Interface("err", err).Msg("watching the events")
				}
				sr.metrics.totalPanics.Inc(1)
			}()
			for {
				select {
				case evt, more := <-sr.eventCh:
					if !more {
						return
					}
					if e := sr.l.Debug(); e.Enabled() {
						e.Interface("event", evt).Msg("received an event")
					}
					var err error
					switch evt.Typ {
					case EventAddOrUpdate:
						switch evt.Kind {
						case EventKindGroup:
							_, err = sr.storeGroup(evt.Metadata.GetMetadata())
							if errors.As(err, schema.ErrGRPCResourceNotFound) {
								err = nil
							}
						case EventKindResource:
							err = sr.storeResource(evt.Metadata)
						case EventKindIndexRule:
							indexRule := evt.Metadata.(*databasev1.IndexRule)
							sr.storeIndexRule(indexRule)
						case EventKindIndexRuleBinding:
							indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding)
							sr.storeIndexRuleBinding(indexRuleBinding)
						}
					case EventDelete:
						switch evt.Kind {
						case EventKindGroup:
							err = sr.deleteGroup(evt.Metadata.GetMetadata())
						case EventKindResource:
							sr.deleteResource(evt.Metadata.GetMetadata())
						case EventKindIndexRule:
							key := getKey(evt.Metadata.GetMetadata())
							sr.indexRuleMap.Delete(key)
						case EventKindIndexRuleBinding:
							indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding)
							col, _ := sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{
								Name:  indexRuleBinding.Subject.GetName(),
								Group: indexRuleBinding.GetMetadata().GetGroup(),
							}))
							if col == nil {
								break
							}
							tMap := col.(*sync.Map)
							key := getKey(indexRuleBinding.GetMetadata())
							tMap.Delete(key)
							for i := range indexRuleBinding.Rules {
								col, _ := sr.bindingBackwardMap.Load(getKey(&commonv1.Metadata{
									Name:  indexRuleBinding.Rules[i],
									Group: indexRuleBinding.GetMetadata().GetGroup(),
								}))
								if col == nil {
									continue
								}
								tMap := col.(*sync.Map)
								tMap.Delete(key)
							}
						}
					}
					if err != nil && !errors.Is(err, schema.ErrClosed) {
						select {
						case <-sr.closer.CloseNotify():
							return
						default:
						}
						sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...")
						sr.metrics.totalErrs.Inc(1)
						go func() {
							sr.SendMetadataEvent(evt)
							sr.metrics.totalRetries.Inc(1)
						}()
					}
				case <-sr.closer.CloseNotify():
					return
				}
			}
		}()
	}
}