func()

in banyand/metadata/schema/watcher.go [214:275]


func (w *watcher) periodicSync() {
	resp, err := w.cli.Get(w.closer.Ctx(), w.key, clientv3.WithPrefix())
	if err != nil {
		if !errors.Is(err, context.Canceled) {
			w.l.Error().Err(err).Msg("periodic sync failed to fetch keys")
		}
		return
	}

	currentState := make(map[string]cacheEntry, len(resp.Kvs))
	for _, kv := range resp.Kvs {
		currentState[string(kv.Key)] = cacheEntry{
			valueHash:   convert.Hash(kv.Value),
			modRevision: kv.ModRevision,
		}
	}

	handlers := w.handlers
	if len(handlers) == 0 {
		w.l.Panic().Msg("no handlers registered")
		return
	}
	w.mu.Lock()
	defer w.mu.Unlock()

	// Detect deletions and changes
	for cachedKey, cachedEntry := range w.cache {
		currentEntry, exists := currentState[cachedKey]
		if !exists {
			// Handle deletion
			delete(w.cache, cachedKey)
			if md, err := w.getFromStore(cachedKey); err == nil {
				for i := range handlers {
					handlers[i].OnDelete(*md)
				}
			}
			continue
		}

		if currentEntry.valueHash != cachedEntry.valueHash {
			// Handle update
			if md, err := w.getFromStore(cachedKey); err == nil {
				for i := range handlers {
					handlers[i].OnAddOrUpdate(*md)
				}
				w.cache[cachedKey] = currentEntry
			}
		}
	}

	// Detect additions
	for key, entry := range currentState {
		if _, exists := w.cache[key]; !exists {
			if md, err := w.getFromStore(key); err == nil {
				for i := range handlers {
					handlers[i].OnAddOrUpdate(*md)
				}
				w.cache[key] = entry
			}
		}
	}
}