func()

in store/engine/etcd/etcd.go [256:292]


func (e *Etcd) observeLeaderEvent(ctx context.Context) {
	defer e.wg.Done()
	var election *concurrency.Election
	select {
	case elect := <-e.electionCh:
		election = elect
	case <-e.quitCh:
		return
	}

	ch := election.Observe(ctx)
	for {
		select {
		case resp := <-ch:
			e.isReady.Store(true)
			if len(resp.Kvs) > 0 {
				newLeaderID := string(resp.Kvs[0].Value)
				e.leaderMu.Lock()
				e.leaderID = newLeaderID
				e.leaderMu.Unlock()
				e.leaderChangeCh <- true
				if newLeaderID != "" && newLeaderID == e.leaderID {
					continue
				}
			} else {
				ch = election.Observe(ctx)
				e.leaderChangeCh <- false
			}
		case elect := <-e.electionCh:
			election = elect
			ch = election.Observe(ctx)
		case <-e.quitCh:
			logger.Get().Info("Exit the leader change observe loop")
			return
		}
	}
}