public List getNonEmptyPartitionFiles()

in src/main/java/com/uber/rss/handlers/DownloadServerHandler.java [83:131]


    public List<FilePathAndLength> getNonEmptyPartitionFiles(String connectionInfoForLogging) {
        if (!storage.isLocalStorage()) {
            throw new RssInvalidStateException("Only local file storage is supported to download shuffle data, closing the connection");
        }

        List<FilePathAndLength> persistedBytes = executor.getPersistedBytes(
            appShuffleId, partitionId)
            .stream()
            .filter(t->t.getLength() > 0)
            .collect(Collectors.toList());

        if (persistedBytes.isEmpty()) {
            return Collections.emptyList();
        }

        for (FilePathAndLength filePathAndLength: persistedBytes) {
            if (!storage.exists(filePathAndLength.getPath())) {
                throw new RssShuffleCorruptedException(String.format(
                    "Shuffle file %s not found for partition %s, %s, %s, but there are persisted bytes: %s",
                    filePathAndLength.getPath(), partitionId, appShuffleId, connectionInfoForLogging, filePathAndLength.getLength()));
            }
            long fileSize = storage.size(filePathAndLength.getPath());
            if (fileSize <= 0) {
                throw new RssShuffleCorruptedException(String.format(
                    "Shuffle file %s is empty for partition %s, %s, %s, but there are persisted bytes: %s",
                    filePathAndLength.getPath(), partitionId, appShuffleId, connectionInfoForLogging, filePathAndLength.getLength()));
            }
            if (fileSize < filePathAndLength.getLength()) {
                throw new RssShuffleCorruptedException(String.format(
                    "Shuffle file %s has less size %s than expected %s for partition %s, %s, %s",
                    filePathAndLength.getPath(), fileSize, filePathAndLength.getLength(), partitionId, appShuffleId, connectionInfoForLogging));
            }
        }

        long totalFileLength = persistedBytes.stream().mapToLong(t->t.getLength()).sum();
        if (totalFileLength == 0) {
            logger.info(
                "Total file length is zero: {}, {}",
                StringUtils.join(persistedBytes, ','), connectionInfoForLogging);
            return Collections.emptyList();
        } else if (totalFileLength < 0) {
            throw new RssInvalidStateException(String.format(
                "Invalid total file length: %s, %s",
                totalFileLength, connectionInfoForLogging));
        }

        // TODO verify there is no open files
        return persistedBytes;
    }