in src/main/java/com/uber/rss/clients/DataBlockSocketReadClient.java [108:167]
private ConnectDownloadResponse connectImpl() {
if (socket != null) {
throw new RssInvalidStateException(String.format("Already connected to server, cannot connect again: %s", connectionInfo));
}
logger.debug("Connecting to server: {}", connectionInfo);
connectSocket();
write(MessageConstants.DOWNLOAD_UPLINK_MAGIC_BYTE);
write(MessageConstants.DOWNLOAD_UPLINK_VERSION_3);
ConnectDownloadRequest connectRequest = new ConnectDownloadRequest(user, appShufflePartitionId, fetchTaskAttemptIds);
ExceptionWrapper<RssException> exceptionWrapper = new ExceptionWrapper<>();
Boolean succeeded = RetryUtils.retryUntilNotNull(dataAvailablePollInterval, dataAvailablePollInterval * POLL_INTERVAL_MAX_MULTIPLIER, dataAvailableWaitTime, () -> {
try {
writeControlMessageAndWaitResponseStatus(connectRequest);
return Boolean.TRUE;
} catch (RssShuffleCorruptedException ex) {
throw new RssShuffleCorruptedException("Shuffle data corrupted for: " + appShufflePartitionId, ex);
} catch (RssMissingShuffleWriteConfigException | RssShuffleStageNotStartedException ex) {
exceptionWrapper.setException(ex);
logger.warn(String.format("Did not find data in server side, server may not run fast enough to get data from client or server hits some issue, %s", appShufflePartitionId), ex);
return null;
}
});
if (succeeded == null || !succeeded.booleanValue()) {
if (exceptionWrapper.getException() != null) {
throw exceptionWrapper.getException();
} else {
throw new RssInvalidStateException(String.format("Failed to connect to server %s, %s", connectionInfo, appShufflePartitionId));
}
}
ConnectDownloadResponse connectDownloadResponse = readResponseMessage(MessageConstants.MESSAGE_ConnectDownloadResponse, ConnectDownloadResponse::deserialize);
logger.info("Connected to server: {}, response: {}", connectionInfo, connectDownloadResponse);
fileCompressionCodec = connectDownloadResponse.getCompressionCodec();
if (connectDownloadResponse.isDataAvailable()) {
this.commitMapTaskCommitStatus = connectDownloadResponse.getMapTaskCommitStatus();
if (this.commitMapTaskCommitStatus == null) {
throw new RssInvalidDataException("MapTaskCommitStatus should not be null");
}
this.commitTaskAttemptIds = new HashSet<>(this.commitMapTaskCommitStatus.getTaskAttemptIds().values());
if (!this.commitTaskAttemptIds.containsAll(fetchTaskAttemptIds)) {
throw new RssInvalidDataException(String.format(
"Task attempt ids not matched, committed: %s, fetching: %s",
this.commitTaskAttemptIds,
fetchTaskAttemptIds));
}
}
return connectDownloadResponse;
}