private void connectAndInitializeClient()

in src/main/java/com/uber/rss/clients/MultiServerSocketReadClient.java [135:179]


  private void connectAndInitializeClient() {
    if (nextClientIndex > servers.size()) {
      throw new RssException(String.format("Invalid operation, next client index %s, total servers %s", nextClientIndex, servers.size()));
    }

    ServerReplicationGroup serverReplicationGroup = servers.get(nextClientIndex);
    logger.info(String.format("Fetching data from server: %s (%s out of %s), partition: %s", serverReplicationGroup, nextClientIndex + 1, servers.size(), appShufflePartitionId));

    ExceptionWrapper<Throwable> exceptionWrapper = new ExceptionWrapper<>();
    String failMsg = String.format("Failed to connect to server: %s, partition: %s", serverReplicationGroup, appShufflePartitionId);

    ReplicatedReadClient newClient = RetryUtils.retryUntilNotNull(clientRetryOptions.getRetryIntervalMillis(), clientRetryOptions.getRetryIntervalMillis()*10, clientRetryOptions.getRetryMaxMillis(), () -> {
      ReplicatedReadClient aClient = null;
      try {
        aClient = new ReplicatedReadClient(serverReplicationGroup,
            timeoutMillis,
            clientRetryOptions,
            user,
            appShufflePartitionId,
            readClientDataOptions,
            checkShuffleReplicaConsistency);
        aClient.connect();
        return aClient;
      } catch (Throwable ex) {
        M3Stats.addException(ex, this.getClass().getSimpleName());
        logger.warn(failMsg, ex);
        exceptionWrapper.setException(ex);
        closeClient(aClient);
        return null;
      }
    });

    if (newClient == null) {
      if (exceptionWrapper.getException() == null) {
        throw new RssException(failMsg);
      } else if (exceptionWrapper.getException() instanceof RuntimeException) {
        throw (RuntimeException)exceptionWrapper.getException();
      } else {
        throw new RssException(failMsg, exceptionWrapper.getException());
      }
    }

    this.currentClient = newClient;
    nextClientIndex++;
  }