in banyand/metadata/schema/watcher.go [214:275]
func (w *watcher) periodicSync() {
resp, err := w.cli.Get(w.closer.Ctx(), w.key, clientv3.WithPrefix())
if err != nil {
if !errors.Is(err, context.Canceled) {
w.l.Error().Err(err).Msg("periodic sync failed to fetch keys")
}
return
}
currentState := make(map[string]cacheEntry, len(resp.Kvs))
for _, kv := range resp.Kvs {
currentState[string(kv.Key)] = cacheEntry{
valueHash: convert.Hash(kv.Value),
modRevision: kv.ModRevision,
}
}
handlers := w.handlers
if len(handlers) == 0 {
w.l.Panic().Msg("no handlers registered")
return
}
w.mu.Lock()
defer w.mu.Unlock()
// Detect deletions and changes
for cachedKey, cachedEntry := range w.cache {
currentEntry, exists := currentState[cachedKey]
if !exists {
// Handle deletion
delete(w.cache, cachedKey)
if md, err := w.getFromStore(cachedKey); err == nil {
for i := range handlers {
handlers[i].OnDelete(*md)
}
}
continue
}
if currentEntry.valueHash != cachedEntry.valueHash {
// Handle update
if md, err := w.getFromStore(cachedKey); err == nil {
for i := range handlers {
handlers[i].OnAddOrUpdate(*md)
}
w.cache[cachedKey] = currentEntry
}
}
}
// Detect additions
for key, entry := range currentState {
if _, exists := w.cache[key]; !exists {
if md, err := w.getFromStore(key); err == nil {
for i := range handlers {
handlers[i].OnAddOrUpdate(*md)
}
w.cache[key] = entry
}
}
}
}