func etcdConnectionChecker()

in api/cmd/root.go [93:147]


func etcdConnectionChecker() context.CancelFunc {
	ctx, cancel := context.WithCancel(context.TODO())
	unavailableTimes := 0

	go func() {
		etcdClient := storage.GenEtcdStorage().GetClient()
		for {
			select {
			case <-time.Tick(10 * time.Second):
				sCtx, sCancel := context.WithTimeout(ctx, 5*time.Second)
				err := etcdClient.Sync(sCtx)
				sCancel()
				if err != nil {
					unavailableTimes++
					log.Errorf("etcd connection loss detected, times: %d", unavailableTimes)
					continue
				}

				// After multiple failures, the connection is restored
				if unavailableTimes >= 1 {
					log.Warnf("etcd connection recovered, but after several connection losses, reinitializing stores, times: %d", unavailableTimes)
					unavailableTimes = 0

					// When this happens, force a full re-initialization of the store
					store.RangeStore(func(key store.HubKey, store *store.GenericStore) bool {
						log.Warnf("etcd store reinitializing: resource: %s", key)
						if err := store.Init(); err != nil {
							log.Errorf("etcd store reinitialize failed: resource: %s, error: %s", key, err)
						}
						return true
					})
				} else {
					log.Info("etcd connection is fine")
				}
			case <-ctx.Done():
				return
			}
		}
	}()

	// Timed re-initialization when etcd watch actively exits
	go func() {
		for {
			select {
			case <-time.Tick(2 * time.Minute):
				err := store.ReInit()
				if err != nil {
					log.Errorf("resource re-initialize failed, err: %v", err)
				}
			}
		}
	}()

	return cancel
}