boolean findCoordinator()

in geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java [1149:1265]


  boolean findCoordinator() throws MemberStartupException {
    SearchState<ID> state = searchState;

    assert localAddress != null;

    if (!state.hasContactedAJoinedLocator && state.registrants.size() >= locators.size()
        && state.view != null && state.viewId > state.lastFindCoordinatorInViewId) {
      state.lastFindCoordinatorInViewId = state.viewId;
      logger.info("using findCoordinatorFromView");
      return findCoordinatorFromView();
    }

    String dhalgo = services.getConfig().getSecurityUDPDHAlgo();
    FindCoordinatorRequest<ID> request = new FindCoordinatorRequest<>(localAddress,
        state.alreadyTried, state.viewId, services.getMessenger().getPublicKey(localAddress),
        services.getMessenger().getRequestId(), dhalgo);
    Set<ID> possibleCoordinators = new HashSet<>();
    Set<ID> coordinatorsWithView = new HashSet<>();

    long giveUpTime =
        System.currentTimeMillis() + ((long) services.getConfig().getLocatorWaitTime() * 1000L);

    int connectTimeout = (int) services.getConfig().getMemberTimeout() * 2;
    boolean anyResponses = false;

    logger.debug("sending {} to {}", request, locators);

    state.hasContactedAJoinedLocator = false;
    state.locatorsContacted = 0;

    do {
      for (HostAndPort laddr : locators) {
        try {
          Object o = locatorClient.requestToServer(laddr, request, connectTimeout, true);
          FindCoordinatorResponse<ID> response =
              (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse<ID>) o : null;
          if (response != null) {
            if (response.getRejectionMessage() != null) {
              throw new MembershipConfigurationException(response.getRejectionMessage());
            }
            setCoordinatorPublicKey(response);
            state.locatorsContacted++;
            if (response.getRegistrants() != null) {
              state.registrants.addAll(response.getRegistrants());
            }
            logger.info("received {} from locator {}", response, laddr);
            if (!state.hasContactedAJoinedLocator && response.getSenderId() != null
                && response.getSenderId().getVmViewId() >= 0) {
              logger.info("Locator's address indicates it is part of a distributed system "
                  + "so I will not become membership coordinator on this attempt to join");
              state.hasContactedAJoinedLocator = true;
            }
            ID responseCoordinator = response.getCoordinator();
            if (responseCoordinator != null) {
              anyResponses = true;
              GMSMembershipView<ID> v = response.getView();
              int viewId = v == null ? -1 : v.getViewId();
              if (viewId > state.viewId) {
                state.viewId = viewId;
                state.view = v;
                state.registrants.clear();
              }
              if (viewId > -1) {
                coordinatorsWithView.add(responseCoordinator);
              }
              // if this node is restarting it should never create its own cluster because
              // the QuorumChecker would have contacted a quorum of live nodes and one of
              // them should already be the coordinator, or should become the coordinator soon
              boolean isMyOldAddress =
                  services.getConfig().isReconnecting() && localAddress.equals(responseCoordinator)
                      && responseCoordinator.getVmViewId() >= 0;
              if (!isMyOldAddress) {
                possibleCoordinators.add(response.getCoordinator());
              }
            }
          }
        } catch (IOException | ClassNotFoundException problem) {
          logger.info("Unable to contact locator " + laddr + ": " + problem);
          logger.debug("Exception thrown when contacting a locator", problem);
          state.lastLocatorException = problem;
          if (state.locatorsContacted == 0 && System.currentTimeMillis() < giveUpTime) {
            try {
              Thread.sleep(FIND_LOCATOR_RETRY_SLEEP);
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
              services.getCancelCriterion().checkCancelInProgress(e);
              throw new MemberStartupException("Interrupted while trying to contact locators");
            }
          }
        }
      }
    } while (!anyResponses && System.currentTimeMillis() < giveUpTime);
    if (possibleCoordinators.isEmpty()) {
      return false;
    }

    if (coordinatorsWithView.size() > 0) {
      possibleCoordinators = coordinatorsWithView;// lets check current coordinators in view only
    }

    Iterator<ID> it = possibleCoordinators.iterator();
    if (possibleCoordinators.size() == 1) {
      state.possibleCoordinator = it.next();
    } else {
      ID oldest = it.next();
      while (it.hasNext()) {
        ID candidate = it.next();
        if (services.getMemberFactory().getComparator().compare(oldest, candidate) > 0) {
          oldest = candidate;
        }
      }
      state.possibleCoordinator = oldest;
    }
    logger.info("findCoordinator chose {} out of these possible coordinators: {}",
        state.possibleCoordinator, possibleCoordinators);
    return true;
  }