in src/main/java/com/uber/rss/handlers/DownloadServerHandler.java [137:197]
public ChannelFuture sendFiles(ChannelHandlerContext ctx, List<FilePathAndLength> nonEmptyFiles, ChannelIdleCheck idleCheck) {
String connectionInfo = NettyUtils.getServerConnectionInfo(ctx);
idleCheck.updateLastReadTime();
ChannelFuture lastSendFileFuture = null;
for (int i = 0; i < nonEmptyFiles.size(); i++) {
final int fileIndex = i;
String splitFile = nonEmptyFiles.get(fileIndex).getPath();
long fileLength = nonEmptyFiles.get(fileIndex).getLength();
logger.info(
"Downloader server sending file: {} ({} of {}, {} bytes), {}",
splitFile, fileIndex + 1, nonEmptyFiles.size(), fileLength, connectionInfo);
// TODO support HDFS in future? need to remove code depending
// on local file: new File(path)
// TODO is storage.size(splitFile) reliable or consistent when finishing writing a file?
DefaultFileRegion fileRegion = new DefaultFileRegion(
new File(splitFile), 0, fileLength);
ChannelFuture sendFileFuture = ctx.writeAndFlush(fileRegion,
ctx.newProgressivePromise());
int numConcurrentReadFilesValue = numConcurrentReadFilesAtomicInteger.incrementAndGet();
numConcurrentReadFiles.update(numConcurrentReadFilesValue);
final long sendFileStartTime = System.currentTimeMillis();
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
executor.updateLiveness(appShuffleId.getAppId());
idleCheck.updateLastReadTime();
int numConcurrentReadFilesValue = numConcurrentReadFilesAtomicInteger.decrementAndGet();
numConcurrentReadFiles.update(numConcurrentReadFilesValue);
numReadFileBytes.inc(fileLength);
String exceptionInfo = "";
Throwable futureException = future.cause();
if (futureException != null) {
M3Stats.addException(futureException, M3Stats.TAG_VALUE_DOWNLOAD_PROCESSOR);
exceptionInfo = String.format(
", exception: %s, %s",
com.uber.rss.util.ExceptionUtils.getSimpleMessage(future.cause()),
ExceptionUtils.getStackTrace(future.cause()));
}
double dataSpeed = LogUtils.calculateMegaBytesPerSecond(System.currentTimeMillis() - sendFileStartTime, fileLength);
logger.info(
"Finished sending file: {} ({} of {}), success: {} ({} mbs, total {} bytes), connection: {} {} {}",
splitFile, fileIndex + 1, nonEmptyFiles.size(), future.isSuccess(), dataSpeed, fileLength, connectionInfo, System.nanoTime(), exceptionInfo);
}
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
double dataSpeed = LogUtils.calculateMegaBytesPerSecond(System.currentTimeMillis() - sendFileStartTime, progress);
logger.debug(
"Sending file: {}, progress: {} out of {} bytes, {} mbs, {}",
splitFile, progress, total, dataSpeed, connectionInfo);
executor.updateLiveness(appShuffleId.getAppId());
idleCheck.updateLastReadTime();
}
});
lastSendFileFuture = sendFileFuture;
}
return lastSendFileFuture;
}