public ChannelFuture sendFiles()

in src/main/java/com/uber/rss/handlers/DownloadServerHandler.java [137:197]


    public ChannelFuture sendFiles(ChannelHandlerContext ctx, List<FilePathAndLength> nonEmptyFiles, ChannelIdleCheck idleCheck) {
        String connectionInfo = NettyUtils.getServerConnectionInfo(ctx);

        idleCheck.updateLastReadTime();

        ChannelFuture lastSendFileFuture = null;
        for (int i = 0; i < nonEmptyFiles.size(); i++) {
            final int fileIndex = i;
            String splitFile = nonEmptyFiles.get(fileIndex).getPath();
            long fileLength = nonEmptyFiles.get(fileIndex).getLength();
            logger.info(
                "Downloader server sending file: {} ({} of {}, {} bytes), {}",
                splitFile, fileIndex + 1, nonEmptyFiles.size(), fileLength, connectionInfo);
            // TODO support HDFS in future? need to remove code depending
            // on local file: new File(path)
            // TODO is storage.size(splitFile) reliable or consistent when finishing writing a file?
            DefaultFileRegion fileRegion = new DefaultFileRegion(
                new File(splitFile), 0, fileLength);
            ChannelFuture sendFileFuture = ctx.writeAndFlush(fileRegion,
                ctx.newProgressivePromise());
            int numConcurrentReadFilesValue = numConcurrentReadFilesAtomicInteger.incrementAndGet();
            numConcurrentReadFiles.update(numConcurrentReadFilesValue);
            final long sendFileStartTime = System.currentTimeMillis();
            sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
                @Override
                public void operationComplete(ChannelProgressiveFuture future) throws Exception {
                    executor.updateLiveness(appShuffleId.getAppId());
                    idleCheck.updateLastReadTime();
                    int numConcurrentReadFilesValue = numConcurrentReadFilesAtomicInteger.decrementAndGet();
                    numConcurrentReadFiles.update(numConcurrentReadFilesValue);
                    numReadFileBytes.inc(fileLength);
                    String exceptionInfo = "";
                    Throwable futureException = future.cause();
                    if (futureException != null) {
                        M3Stats.addException(futureException, M3Stats.TAG_VALUE_DOWNLOAD_PROCESSOR);
                        exceptionInfo = String.format(
                            ", exception: %s, %s",
                            com.uber.rss.util.ExceptionUtils.getSimpleMessage(future.cause()),
                            ExceptionUtils.getStackTrace(future.cause()));
                    }
                    double dataSpeed = LogUtils.calculateMegaBytesPerSecond(System.currentTimeMillis() - sendFileStartTime, fileLength);
                    logger.info(
                        "Finished sending file: {} ({} of {}), success: {} ({} mbs, total {} bytes), connection: {} {} {}",
                        splitFile, fileIndex + 1, nonEmptyFiles.size(), future.isSuccess(), dataSpeed, fileLength, connectionInfo, System.nanoTime(), exceptionInfo);
                }

                @Override
                public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
                    double dataSpeed = LogUtils.calculateMegaBytesPerSecond(System.currentTimeMillis() - sendFileStartTime, progress);
                    logger.debug(
                        "Sending file: {}, progress: {} out of {} bytes, {} mbs, {}",
                        splitFile, progress, total, dataSpeed, connectionInfo);
                    executor.updateLiveness(appShuffleId.getAppId());
                    idleCheck.updateLastReadTime();
                }
            });
            lastSendFileFuture = sendFileFuture;
        }

        return lastSendFileFuture;
    }