in pkg/hostmgr/mesos/mesos-go/detector/standalone.go [116:191]
func (s *Standalone) _poller(pf fetcherFunc) {
defer func() {
defer s.Cancel()
log.Warn("shutting down standalone master detection")
}()
if s.initial == nil {
log.Errorf("aborting master poller since initial master info is nil")
return
}
addr := s.initial.GetHostname()
if len(addr) == 0 {
if s.initial.GetIp() == 0 {
log.Warn("aborted mater poller since initial master info has no host")
return
}
ip := make([]byte, 4)
binary.BigEndian.PutUint32(ip, s.initial.GetIp())
addr = net.IP(ip).To4().String()
}
port := uint32(s.assumedMasterPort)
if s.initial.Port != nil && *s.initial.Port != 0 {
port = *s.initial.Port
}
addr = net.JoinHostPort(addr, strconv.Itoa(int(port)))
log.Infof("polling for master leadership at '%v'", addr)
var lastpid *upid.UPID
for {
startedAt := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), s.leaderSyncInterval)
if pid, err := pf(ctx, addr); err == nil {
if !pid.Equal(lastpid) {
log.Infof("detected leadership change from '%v' to '%v'", lastpid, pid)
lastpid = pid
elapsed := time.Now().Sub(startedAt)
mi := CreateMasterInfo(pid)
select {
case s.ch <- mi: // noop
case <-time.After(s.leaderSyncInterval - elapsed):
// no one heard the master change, oh well - poll again
goto continuePolling
case <-s.done:
cancel()
return
}
} else {
log.Infof("no change to master leadership: '%v'", lastpid)
}
} else if err == context.DeadlineExceeded {
if lastpid != nil {
lastpid = nil
select {
case s.ch <- nil: // lost master
case <-s.done: // no need to cancel ctx
return
}
}
goto continuePolling
} else {
select {
case <-s.done:
cancel()
return
default:
if err != context.Canceled {
log.Error(err)
}
}
}
if remaining := s.leaderSyncInterval - time.Now().Sub(startedAt); remaining > 0 {
log.Infof("master leader poller sleeping for %v", remaining)
time.Sleep(remaining)
}
continuePolling:
cancel()
}
}