in genie-web/src/main/java/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl.java [452:544]
private synchronized FileTransfer startFileTransfer(
final String jobId,
final DirectoryManifest.ManifestEntry manifestEntry,
final Path relativePath,
@Nullable final HttpRange range
) throws NotFoundException, LimitExceededException {
// Create a unique ID for this file transfer
final String fileTransferId = UUID.randomUUID().toString();
log.debug(
"Initiating transfer {} for file: {} of job: {}",
fileTransferId,
relativePath,
jobId
);
// No need to use a semaphore since class is synchronized
if (this.activeTransfers.size() >= properties.getMaxConcurrentTransfers()) {
log.warn("Rejecting request for {}:{}, too many active transfers", jobId, relativePath);
throw new LimitExceededException("Too many concurrent downloads");
}
final long fileSize = manifestEntry.getSize();
// Http range is inclusive, agent protocol is not.
// Convert from one to the other.
final long startOffset;
final long endOffset;
if (range == null) {
startOffset = 0;
endOffset = fileSize;
} else if (range.getClass() == this.suffixRangeClass) {
startOffset = range.getRangeStart(fileSize);
endOffset = fileSize;
} else {
startOffset = Math.min(fileSize, range.getRangeStart(fileSize));
endOffset = 1 + range.getRangeEnd(fileSize);
}
log.debug("Transfer {} effective range {}-{}: of job: {} ", fileTransferId, startOffset, endOffset, jobId);
// Allocate and park the buffer that will store the data in transit.
final StreamBuffer buffer = new StreamBuffer(startOffset);
// Create a file transfer
final FileTransfer fileTransfer = new FileTransfer(
fileTransferId,
jobId,
relativePath,
startOffset,
endOffset,
fileSize,
buffer
);
this.transferSizeDistribution.record(endOffset - startOffset);
if (endOffset - startOffset == 0) {
log.debug("Transfer {} is empty, completing of job: {}", fileTransferId, jobId);
// When requesting an empty file (or a range of 0 bytes), short-circuit and just return an empty
// buffer, without tracking it as active transfer.
buffer.closeForCompleted();
} else {
log.debug("Tracking new transfer {} of job: {}", fileTransferId, jobId);
// Expecting some data. Track this stream and its buffer so incoming chunks can be appended.
this.activeTransfers.put(fileTransferId, fileTransfer);
log.debug("Requesting start of transfer {} of job: {}", fileTransferId, jobId);
// Request file over control channel
try {
this.controlStreamsManager.requestFile(
jobId,
fileTransferId,
relativePath.toString(),
startOffset,
endOffset
);
} catch (IndexOutOfBoundsException | NotFoundException e) {
log.error(
"Failed to request file {}:{}, terminating transfer {}: {}",
jobId,
relativePath,
fileTransferId,
e.getMessage()
);
this.activeTransfers.remove(fileTransferId, fileTransfer);
buffer.closeForError(e);
throw e;
}
}
return fileTransfer;
}