in store/engine/etcd/etcd.go [256:292]
func (e *Etcd) observeLeaderEvent(ctx context.Context) {
defer e.wg.Done()
var election *concurrency.Election
select {
case elect := <-e.electionCh:
election = elect
case <-e.quitCh:
return
}
ch := election.Observe(ctx)
for {
select {
case resp := <-ch:
e.isReady.Store(true)
if len(resp.Kvs) > 0 {
newLeaderID := string(resp.Kvs[0].Value)
e.leaderMu.Lock()
e.leaderID = newLeaderID
e.leaderMu.Unlock()
e.leaderChangeCh <- true
if newLeaderID != "" && newLeaderID == e.leaderID {
continue
}
} else {
ch = election.Observe(ctx)
e.leaderChangeCh <- false
}
case elect := <-e.electionCh:
election = elect
ch = election.Observe(ctx)
case <-e.quitCh:
logger.Get().Info("Exit the leader change observe loop")
return
}
}
}