in pkg/schema/metadata.go [158:207]
func (sr *schemaRepo) Watcher() {
for i := 0; i < sr.workerNum; i++ {
go func() {
defer func() {
sr.workerCloser.Done()
if err := recover(); err != nil {
sr.l.Warn().Interface("err", err).Msg("watching the events")
}
}()
for {
select {
case evt, more := <-sr.eventCh:
if !more {
return
}
if e := sr.l.Debug(); e.Enabled() {
e.Interface("event", evt).Msg("received an event")
}
var err error
switch evt.Typ {
case EventAddOrUpdate:
switch evt.Kind {
case EventKindGroup:
_, err = sr.StoreGroup(evt.Metadata)
case EventKindResource:
_, err = sr.storeResource(evt.Metadata)
}
case EventDelete:
switch evt.Kind {
case EventKindGroup:
err = sr.deleteGroup(evt.Metadata)
case EventKindResource:
err = sr.deleteResource(evt.Metadata)
}
}
if err != nil && !errors.Is(err, schema.ErrClosed) {
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...")
select {
case sr.eventCh <- evt:
case <-sr.workerCloser.CloseNotify():
return
}
}
case <-sr.workerCloser.CloseNotify():
return
}
}
}()
}
}