in pkg/schema/cache.go [149:242]
func (sr *schemaRepo) Watcher() {
for i := 0; i < sr.workerNum; i++ {
go func() {
if !sr.closer.AddReceiver() {
return
}
defer func() {
sr.closer.ReceiverDone()
if err := recover(); err != nil {
sr.l.Warn().Interface("err", err).Msg("watching the events")
}
sr.metrics.totalPanics.Inc(1)
}()
for {
select {
case evt, more := <-sr.eventCh:
if !more {
return
}
if e := sr.l.Debug(); e.Enabled() {
e.Interface("event", evt).Msg("received an event")
}
var err error
switch evt.Typ {
case EventAddOrUpdate:
switch evt.Kind {
case EventKindGroup:
_, err = sr.storeGroup(evt.Metadata.GetMetadata())
if errors.As(err, schema.ErrGRPCResourceNotFound) {
err = nil
}
case EventKindResource:
err = sr.storeResource(evt.Metadata)
case EventKindIndexRule:
indexRule := evt.Metadata.(*databasev1.IndexRule)
sr.storeIndexRule(indexRule)
case EventKindIndexRuleBinding:
indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding)
sr.storeIndexRuleBinding(indexRuleBinding)
}
case EventDelete:
switch evt.Kind {
case EventKindGroup:
err = sr.deleteGroup(evt.Metadata.GetMetadata())
case EventKindResource:
sr.deleteResource(evt.Metadata.GetMetadata())
case EventKindIndexRule:
key := getKey(evt.Metadata.GetMetadata())
sr.indexRuleMap.Delete(key)
case EventKindIndexRuleBinding:
indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding)
col, _ := sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{
Name: indexRuleBinding.Subject.GetName(),
Group: indexRuleBinding.GetMetadata().GetGroup(),
}))
if col == nil {
break
}
tMap := col.(*sync.Map)
key := getKey(indexRuleBinding.GetMetadata())
tMap.Delete(key)
for i := range indexRuleBinding.Rules {
col, _ := sr.bindingBackwardMap.Load(getKey(&commonv1.Metadata{
Name: indexRuleBinding.Rules[i],
Group: indexRuleBinding.GetMetadata().GetGroup(),
}))
if col == nil {
continue
}
tMap := col.(*sync.Map)
tMap.Delete(key)
}
}
}
if err != nil && !errors.Is(err, schema.ErrClosed) {
select {
case <-sr.closer.CloseNotify():
return
default:
}
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...")
sr.metrics.totalErrs.Inc(1)
go func() {
sr.SendMetadataEvent(evt)
sr.metrics.totalRetries.Inc(1)
}()
}
case <-sr.closer.CloseNotify():
return
}
}
}()
}
}