func()

in pkg/hostmgr/mesos/mesos-go/detector/standalone.go [116:191]


func (s *Standalone) _poller(pf fetcherFunc) {
	defer func() {
		defer s.Cancel()
		log.Warn("shutting down standalone master detection")
	}()
	if s.initial == nil {
		log.Errorf("aborting master poller since initial master info is nil")
		return
	}
	addr := s.initial.GetHostname()
	if len(addr) == 0 {
		if s.initial.GetIp() == 0 {
			log.Warn("aborted mater poller since initial master info has no host")
			return
		}
		ip := make([]byte, 4)
		binary.BigEndian.PutUint32(ip, s.initial.GetIp())
		addr = net.IP(ip).To4().String()
	}
	port := uint32(s.assumedMasterPort)
	if s.initial.Port != nil && *s.initial.Port != 0 {
		port = *s.initial.Port
	}
	addr = net.JoinHostPort(addr, strconv.Itoa(int(port)))
	log.Infof("polling for master leadership at '%v'", addr)
	var lastpid *upid.UPID
	for {
		startedAt := time.Now()
		ctx, cancel := context.WithTimeout(context.Background(), s.leaderSyncInterval)
		if pid, err := pf(ctx, addr); err == nil {
			if !pid.Equal(lastpid) {
				log.Infof("detected leadership change from '%v' to '%v'", lastpid, pid)
				lastpid = pid
				elapsed := time.Now().Sub(startedAt)
				mi := CreateMasterInfo(pid)
				select {
				case s.ch <- mi: // noop
				case <-time.After(s.leaderSyncInterval - elapsed):
					// no one heard the master change, oh well - poll again
					goto continuePolling
				case <-s.done:
					cancel()
					return
				}
			} else {
				log.Infof("no change to master leadership: '%v'", lastpid)
			}
		} else if err == context.DeadlineExceeded {
			if lastpid != nil {
				lastpid = nil
				select {
				case s.ch <- nil: // lost master
				case <-s.done: // no need to cancel ctx
					return
				}
			}
			goto continuePolling
		} else {
			select {
			case <-s.done:
				cancel()
				return
			default:
				if err != context.Canceled {
					log.Error(err)
				}
			}
		}
		if remaining := s.leaderSyncInterval - time.Now().Sub(startedAt); remaining > 0 {
			log.Infof("master leader poller sleeping for %v", remaining)
			time.Sleep(remaining)
		}
	continuePolling:
		cancel()
	}
}