in src/main/java/com/uber/rss/clients/DataBlockSocketReadClient.java [169:233]
public GetDataAvailabilityResponse waitDataAvailable() {
if (this.commitMapTaskCommitStatus != null) {
throw new RssInvalidStateException("Data already available, should not wait again");
}
long startTime = System.currentTimeMillis();
logger.info("Waiting for all mappers finished: {}, {}", appShufflePartitionId, connectionInfo);
Stopwatch reducerWaitTimeStopwatch = metrics.getReducerWaitTime().start();
final ObjectWrapper<GetDataAvailabilityResponse> getDataAvailabilityRetryLastResult = new ObjectWrapper<>();
try {
RetryUtils.retryUntilNotNull(dataAvailablePollInterval, dataAvailablePollInterval * POLL_INTERVAL_MAX_MULTIPLIER, dataAvailableWaitTime, () -> {
GetDataAvailabilityResponse getDataAvailabilityResponse = getDataAvailability();
getDataAvailabilityRetryLastResult.setObject(getDataAvailabilityResponse);
if (getDataAvailabilityResponse.isDataAvailable()) {
return getDataAvailabilityResponse;
} else {
return null;
}
});
} finally {
reducerWaitTimeStopwatch.stop();
}
logger.info("Finished waiting for all mappers to finish, partition: {}, duration: {} seconds",
appShufflePartitionId, (System.currentTimeMillis() - startTime)/1000);
GetDataAvailabilityResponse getDataAvailabilityRetryResult = getDataAvailabilityRetryLastResult.getObject();
// Throw exception if not get the status which indicating all mappers are finished
if (getDataAvailabilityRetryResult == null || !getDataAvailabilityRetryResult.isDataAvailable()) {
// get task attempt ids from GetDataAvailabilityResponse and put them into the exception to help debugging
String taskAttemptIdInfo = "";
if (getDataAvailabilityRetryResult != null && getDataAvailabilityRetryResult.getMapTaskCommitStatus() != null) {
MapTaskCommitStatus mapTaskCommitStatus = getDataAvailabilityRetryResult.getMapTaskCommitStatus();
if (mapTaskCommitStatus.getTaskAttemptIds().isEmpty()) {
taskAttemptIdInfo = String.format("no task attempt committed");
} else {
List<Long> taskAttemptIds = mapTaskCommitStatus.getTaskAttemptIds().values().stream().collect(Collectors.toList());
Collections.sort(taskAttemptIds);
taskAttemptIdInfo = String.format("committed task ids: %s, fetching tasks: %s",
StringUtils.toString4SortedIntList(taskAttemptIds),
StringUtils.toString4SortedIntList(fetchTaskAttemptIds.stream().sorted().collect(Collectors.toList())));
}
}
throw new RssShuffleDataNotAvailableException(String.format(
"Not all mappers finished after trying %s:%s for %s millis, partition: %s, %s",
host, port, dataAvailableWaitTime, appShufflePartitionId, taskAttemptIdInfo));
}
this.commitMapTaskCommitStatus = getDataAvailabilityRetryResult.getMapTaskCommitStatus();
if (this.commitMapTaskCommitStatus == null) {
throw new RssInvalidDataException("MapTaskCommitStatus should not be null");
}
this.commitTaskAttemptIds = new HashSet<>(this.commitMapTaskCommitStatus.getTaskAttemptIds().values());
if (!this.commitTaskAttemptIds.containsAll(fetchTaskAttemptIds)) {
throw new RssInvalidDataException(String.format(
"Task attempt ids not matched, committed: %s, fetching: %s",
this.commitTaskAttemptIds,
fetchTaskAttemptIds));
}
return getDataAvailabilityRetryResult;
}