private synchronized FileTransfer startFileTransfer()

in genie-web/src/main/java/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl.java [452:544]


        private synchronized FileTransfer startFileTransfer(
            final String jobId,
            final DirectoryManifest.ManifestEntry manifestEntry,
            final Path relativePath,
            @Nullable final HttpRange range
        ) throws NotFoundException, LimitExceededException {
            // Create a unique ID for this file transfer
            final String fileTransferId = UUID.randomUUID().toString();
            log.debug(
                "Initiating transfer {} for file: {} of job: {}",
                fileTransferId,
                relativePath,
                jobId
            );

            // No need to use a semaphore since class is synchronized
            if (this.activeTransfers.size() >= properties.getMaxConcurrentTransfers()) {
                log.warn("Rejecting request for {}:{}, too many active transfers", jobId, relativePath);
                throw new LimitExceededException("Too many concurrent downloads");
            }

            final long fileSize = manifestEntry.getSize();

            // Http range is inclusive, agent protocol is not.
            // Convert from one to the other.
            final long startOffset;
            final long endOffset;

            if (range == null) {
                startOffset = 0;
                endOffset = fileSize;
            } else if (range.getClass() == this.suffixRangeClass) {
                startOffset = range.getRangeStart(fileSize);
                endOffset = fileSize;
            } else {
                startOffset = Math.min(fileSize, range.getRangeStart(fileSize));
                endOffset = 1 + range.getRangeEnd(fileSize);
            }

            log.debug("Transfer {} effective range {}-{}: of job: {} ", fileTransferId, startOffset, endOffset, jobId);

            // Allocate and park the buffer that will store the data in transit.
            final StreamBuffer buffer = new StreamBuffer(startOffset);

            // Create a file transfer
            final FileTransfer fileTransfer = new FileTransfer(
                fileTransferId,
                jobId,
                relativePath,
                startOffset,
                endOffset,
                fileSize,
                buffer
            );

            this.transferSizeDistribution.record(endOffset - startOffset);

            if (endOffset - startOffset == 0) {
                log.debug("Transfer {} is empty, completing of job: {}", fileTransferId, jobId);
                // When requesting an empty file (or a range of 0 bytes), short-circuit and just return an empty
                // buffer, without tracking it as active transfer.
                buffer.closeForCompleted();
            } else {
                log.debug("Tracking new transfer {} of job: {}", fileTransferId, jobId);
                // Expecting some data. Track this stream and its buffer so incoming chunks can be appended.
                this.activeTransfers.put(fileTransferId, fileTransfer);

                log.debug("Requesting start of transfer {} of job: {}", fileTransferId, jobId);
                // Request file over control channel
                try {
                    this.controlStreamsManager.requestFile(
                        jobId,
                        fileTransferId,
                        relativePath.toString(),
                        startOffset,
                        endOffset
                    );
                } catch (IndexOutOfBoundsException | NotFoundException e) {
                    log.error(
                        "Failed to request file {}:{}, terminating transfer {}: {}",
                        jobId,
                        relativePath,
                        fileTransferId,
                        e.getMessage()
                    );
                    this.activeTransfers.remove(fileTransferId, fileTransfer);
                    buffer.closeForError(e);
                    throw e;
                }
            }

            return fileTransfer;
        }