in src/main/java/com/uber/rss/clients/ServerIdAwareSyncWriteClient.java [98:153]
private ConnectUploadResponse connectImpl(ServerDetail serverDetail, ServerConnectionRefresher refresher, boolean finishUploadAck) {
ServerHostAndPort hostAndPort = ServerHostAndPort.fromString(serverDetail.getConnectionString());
ConnectUploadResponse uploadServerVerboseInfo;
try {
if (!usePooledConnection) {
writeClient = UnpooledWriteClientFactory.getInstance().getOrCreateClient(
hostAndPort.getHost(),
hostAndPort.getPort(),
timeoutMillis,
finishUploadAck,
user,
appId,
appAttempt,
shuffleWriteConfig);
} else {
writeClient = PooledWriteClientFactory.getInstance().getOrCreateClient(
hostAndPort.getHost(),
hostAndPort.getPort(),
timeoutMillis,
finishUploadAck,
user,
appId,
appAttempt,
shuffleWriteConfig);
}
uploadServerVerboseInfo = writeClient.connect();
} catch (RssNetworkException ex) {
closeUnderlyingClient();
if (refresher == null) {
throw ex;
} else {
logger.warn(String.format("Failed to connect, retrying: %s", serverDetail), ex);
ServerDetail newServerDetail = refresher.refreshConnection(serverDetail);
logger.info(String.format("Retry with %s for %s", newServerDetail, serverDetail));
return connectImpl(newServerDetail, null, finishUploadAck);
}
} catch (Throwable ex) {
close();
throw ex;
}
if (!uploadServerVerboseInfo.getServerId().equals(serverDetail.getServerId())) {
close();
String msg = String.format("Server id (%s) is not expected (%s)", uploadServerVerboseInfo.getServerId(), serverDetail);
throw new RssInvalidServerIdException(msg);
} else if (uploadServerVerboseInfo.getRunningVersion() != null && !uploadServerVerboseInfo.getRunningVersion().equals(serverDetail.getRunningVersion())) {
close();
String msg = String.format("Server version (%s) is not expected (%s)", uploadServerVerboseInfo.getRunningVersion(), serverDetail);
throw new RssInvalidServerVersionException(msg);
}
return uploadServerVerboseInfo;
}