in src/main/java/com/uber/rss/handlers/DownloadServerHandler.java [83:131]
public List<FilePathAndLength> getNonEmptyPartitionFiles(String connectionInfoForLogging) {
if (!storage.isLocalStorage()) {
throw new RssInvalidStateException("Only local file storage is supported to download shuffle data, closing the connection");
}
List<FilePathAndLength> persistedBytes = executor.getPersistedBytes(
appShuffleId, partitionId)
.stream()
.filter(t->t.getLength() > 0)
.collect(Collectors.toList());
if (persistedBytes.isEmpty()) {
return Collections.emptyList();
}
for (FilePathAndLength filePathAndLength: persistedBytes) {
if (!storage.exists(filePathAndLength.getPath())) {
throw new RssShuffleCorruptedException(String.format(
"Shuffle file %s not found for partition %s, %s, %s, but there are persisted bytes: %s",
filePathAndLength.getPath(), partitionId, appShuffleId, connectionInfoForLogging, filePathAndLength.getLength()));
}
long fileSize = storage.size(filePathAndLength.getPath());
if (fileSize <= 0) {
throw new RssShuffleCorruptedException(String.format(
"Shuffle file %s is empty for partition %s, %s, %s, but there are persisted bytes: %s",
filePathAndLength.getPath(), partitionId, appShuffleId, connectionInfoForLogging, filePathAndLength.getLength()));
}
if (fileSize < filePathAndLength.getLength()) {
throw new RssShuffleCorruptedException(String.format(
"Shuffle file %s has less size %s than expected %s for partition %s, %s, %s",
filePathAndLength.getPath(), fileSize, filePathAndLength.getLength(), partitionId, appShuffleId, connectionInfoForLogging));
}
}
long totalFileLength = persistedBytes.stream().mapToLong(t->t.getLength()).sum();
if (totalFileLength == 0) {
logger.info(
"Total file length is zero: {}, {}",
StringUtils.join(persistedBytes, ','), connectionInfoForLogging);
return Collections.emptyList();
} else if (totalFileLength < 0) {
throw new RssInvalidStateException(String.format(
"Invalid total file length: %s, %s",
totalFileLength, connectionInfoForLogging));
}
// TODO verify there is no open files
return persistedBytes;
}