in api/cmd/root.go [93:147]
func etcdConnectionChecker() context.CancelFunc {
ctx, cancel := context.WithCancel(context.TODO())
unavailableTimes := 0
go func() {
etcdClient := storage.GenEtcdStorage().GetClient()
for {
select {
case <-time.Tick(10 * time.Second):
sCtx, sCancel := context.WithTimeout(ctx, 5*time.Second)
err := etcdClient.Sync(sCtx)
sCancel()
if err != nil {
unavailableTimes++
log.Errorf("etcd connection loss detected, times: %d", unavailableTimes)
continue
}
// After multiple failures, the connection is restored
if unavailableTimes >= 1 {
log.Warnf("etcd connection recovered, but after several connection losses, reinitializing stores, times: %d", unavailableTimes)
unavailableTimes = 0
// When this happens, force a full re-initialization of the store
store.RangeStore(func(key store.HubKey, store *store.GenericStore) bool {
log.Warnf("etcd store reinitializing: resource: %s", key)
if err := store.Init(); err != nil {
log.Errorf("etcd store reinitialize failed: resource: %s, error: %s", key, err)
}
return true
})
} else {
log.Info("etcd connection is fine")
}
case <-ctx.Done():
return
}
}
}()
// Timed re-initialization when etcd watch actively exits
go func() {
for {
select {
case <-time.Tick(2 * time.Minute):
err := store.ReInit()
if err != nil {
log.Errorf("resource re-initialize failed, err: %v", err)
}
}
}
}()
return cancel
}