banyand/metadata/schema/watcher.go (237 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package schema
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
mvccpb "go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
type watchEventHandler interface {
OnAddOrUpdate(Metadata)
OnDelete(Metadata)
}
type watcherConfig struct {
key string
revision int64
kind Kind
checkInterval time.Duration
}
type cacheEntry struct {
valueHash uint64 // Hash of the value
modRevision int64 // Last modified revision
}
type watcher struct {
cli *clientv3.Client
closer *run.Closer
l *logger.Logger
ticker *time.Ticker
cache map[string]cacheEntry
key string
handlers []watchEventHandler
revision int64
kind Kind
checkInterval time.Duration
mu sync.RWMutex
startOnce sync.Once
}
func newWatcher(cli *clientv3.Client, wc watcherConfig, l *logger.Logger) *watcher {
if wc.checkInterval == 0 {
wc.checkInterval = 5 * time.Minute
}
w := &watcher{
cli: cli,
key: wc.key,
kind: wc.kind,
revision: wc.revision,
closer: run.NewCloser(1),
l: l,
checkInterval: wc.checkInterval,
cache: make(map[string]cacheEntry),
}
return w
}
func (w *watcher) Start() {
w.startOnce.Do(func() {
if w.revision < 1 {
w.periodicSync()
}
go w.watch(w.revision)
})
}
func (w *watcher) AddHandler(handler watchEventHandler) {
if w.handlers == nil {
w.handlers = make([]watchEventHandler, 0)
}
w.handlers = append(w.handlers, handler)
}
func (w *watcher) Close() {
w.closer.Done()
w.closer.CloseThenWait()
}
func (w *watcher) watch(revision int64) {
if !w.closer.AddRunning() {
return
}
defer w.closer.Done()
cli := w.cli
w.ticker = time.NewTicker(w.checkInterval)
defer w.ticker.Stop()
OUTER:
for {
select {
case <-w.closer.CloseNotify():
return
default:
}
if revision < 0 {
// Use periodic sync to recover state and get new revision
w.periodicSync()
revision = w.revision
continue
}
wch := cli.Watch(w.closer.Ctx(), w.key,
clientv3.WithPrefix(),
clientv3.WithRev(revision+1),
clientv3.WithPrevKV(),
)
if wch == nil {
continue
}
for {
select {
case <-w.closer.CloseNotify():
w.l.Info().Msg("watcher closed")
return
case <-w.ticker.C:
w.periodicSync()
case watchResp, ok := <-wch:
if !ok {
select {
case <-w.closer.CloseNotify():
return
default:
revision = -1
continue OUTER
}
}
if err := watchResp.Err(); err != nil {
if errors.Is(err, v3rpc.ErrCompacted) {
revision = -1
continue OUTER
}
continue
}
w.revision = watchResp.Header.Revision
for _, event := range watchResp.Events {
w.handle(event, &watchResp)
}
}
}
}
}
func (w *watcher) handle(watchEvent *clientv3.Event, watchResp *clientv3.WatchResponse) {
keyStr := string(watchEvent.Kv.Key)
entry := cacheEntry{
valueHash: convert.Hash(watchEvent.Kv.Value),
modRevision: watchEvent.Kv.ModRevision,
}
handlers := w.handlers
if len(handlers) == 0 {
w.l.Panic().Msg("no handlers registered")
return
}
w.mu.Lock()
defer w.mu.Unlock()
switch watchEvent.Type {
case mvccpb.PUT:
if existing, exists := w.cache[keyStr]; !exists || existing.modRevision < entry.modRevision {
w.cache[keyStr] = entry
md, err := w.kind.Unmarshal(watchEvent.Kv)
if err != nil {
w.l.Error().Stringer("event_header", &watchResp.Header).AnErr("err", err).Msg("failed to unmarshal message")
return
}
for i := range handlers {
handlers[i].OnAddOrUpdate(md)
}
}
case mvccpb.DELETE:
delete(w.cache, keyStr)
md, err := w.kind.Unmarshal(watchEvent.PrevKv)
if err != nil {
w.l.Error().Stringer("event_header", &watchResp.Header).AnErr("err", err).Msg("failed to unmarshal message")
return
}
for i := range handlers {
handlers[i].OnDelete(md)
}
}
}
func (w *watcher) periodicSync() {
resp, err := w.cli.Get(w.closer.Ctx(), w.key, clientv3.WithPrefix())
if err != nil {
if !errors.Is(err, context.Canceled) {
w.l.Error().Err(err).Msg("periodic sync failed to fetch keys")
}
return
}
currentState := make(map[string]cacheEntry, len(resp.Kvs))
for _, kv := range resp.Kvs {
currentState[string(kv.Key)] = cacheEntry{
valueHash: convert.Hash(kv.Value),
modRevision: kv.ModRevision,
}
}
handlers := w.handlers
if len(handlers) == 0 {
w.l.Panic().Msg("no handlers registered")
return
}
w.mu.Lock()
defer w.mu.Unlock()
// Detect deletions and changes
for cachedKey, cachedEntry := range w.cache {
currentEntry, exists := currentState[cachedKey]
if !exists {
// Handle deletion
delete(w.cache, cachedKey)
if md, err := w.getFromStore(cachedKey); err == nil {
for i := range handlers {
handlers[i].OnDelete(*md)
}
}
continue
}
if currentEntry.valueHash != cachedEntry.valueHash {
// Handle update
if md, err := w.getFromStore(cachedKey); err == nil {
for i := range handlers {
handlers[i].OnAddOrUpdate(*md)
}
w.cache[cachedKey] = currentEntry
}
}
}
// Detect additions
for key, entry := range currentState {
if _, exists := w.cache[key]; !exists {
if md, err := w.getFromStore(key); err == nil {
for i := range handlers {
handlers[i].OnAddOrUpdate(*md)
}
w.cache[key] = entry
}
}
}
}
func (w *watcher) getFromStore(key string) (*Metadata, error) {
resp, err := w.cli.Get(w.closer.Ctx(), key)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return nil, errors.New("key not found")
}
md, err := w.kind.Unmarshal(resp.Kvs[0])
return &md, err
}