in src/main/java/com/uber/rss/clients/MultiServerSocketReadClient.java [135:179]
private void connectAndInitializeClient() {
if (nextClientIndex > servers.size()) {
throw new RssException(String.format("Invalid operation, next client index %s, total servers %s", nextClientIndex, servers.size()));
}
ServerReplicationGroup serverReplicationGroup = servers.get(nextClientIndex);
logger.info(String.format("Fetching data from server: %s (%s out of %s), partition: %s", serverReplicationGroup, nextClientIndex + 1, servers.size(), appShufflePartitionId));
ExceptionWrapper<Throwable> exceptionWrapper = new ExceptionWrapper<>();
String failMsg = String.format("Failed to connect to server: %s, partition: %s", serverReplicationGroup, appShufflePartitionId);
ReplicatedReadClient newClient = RetryUtils.retryUntilNotNull(clientRetryOptions.getRetryIntervalMillis(), clientRetryOptions.getRetryIntervalMillis()*10, clientRetryOptions.getRetryMaxMillis(), () -> {
ReplicatedReadClient aClient = null;
try {
aClient = new ReplicatedReadClient(serverReplicationGroup,
timeoutMillis,
clientRetryOptions,
user,
appShufflePartitionId,
readClientDataOptions,
checkShuffleReplicaConsistency);
aClient.connect();
return aClient;
} catch (Throwable ex) {
M3Stats.addException(ex, this.getClass().getSimpleName());
logger.warn(failMsg, ex);
exceptionWrapper.setException(ex);
closeClient(aClient);
return null;
}
});
if (newClient == null) {
if (exceptionWrapper.getException() == null) {
throw new RssException(failMsg);
} else if (exceptionWrapper.getException() instanceof RuntimeException) {
throw (RuntimeException)exceptionWrapper.getException();
} else {
throw new RssException(failMsg, exceptionWrapper.getException());
}
}
this.currentClient = newClient;
nextClientIndex++;
}