in util/DirectorySourceQueue.cpp [480:556]
void DirectorySourceQueue::createIntoQueueInternal(SourceMetaData *metadata) {
// TODO: currently we are treating small files(size less than blocksize) as
// blocks. Also, we transfer file name in the header for all the blocks for a
// large file. This can be optimized as follows -
// a) if filesize < blocksize, we do not send blocksize and offset in the
// header. This should be useful for tiny files(0-few hundred bytes). We will
// have to use separate header format and commands for files and blocks.
// b) if filesize > blocksize, we can use send filename only in the first
// block and use a shorter header for subsequent blocks. Also, we can remove
// block size once negotiated, since blocksize is sort of fixed.
auto &fileSize = metadata->size;
auto &relPath = metadata->relPath;
int64_t blockSizeBytes = blockSizeMbytes_ * 1024 * 1024;
bool enableBlockTransfer = blockSizeBytes > 0;
if (!enableBlockTransfer) {
WVLOG(2) << "Block transfer disabled for this transfer";
}
// if block transfer is disabled, treating fileSize as block size. This
// ensures that we create a single block
auto blockSize = enableBlockTransfer ? blockSizeBytes : fileSize;
int blockCount = 0;
std::vector<Interval> remainingChunks;
int64_t seqId;
FileAllocationStatus allocationStatus;
int64_t prevSeqId = 0;
auto it = previouslyTransferredChunks_.find(relPath);
if (it == previouslyTransferredChunks_.end()) {
// No previously transferred chunks
remainingChunks.emplace_back(0, fileSize);
seqId = nextSeqId_++;
allocationStatus = NOT_EXISTS;
} else if (it->second.getFileSize() > fileSize) {
// file size is greater on the receiver side
remainingChunks.emplace_back(0, fileSize);
seqId = nextSeqId_++;
WLOG(INFO) << "File size is greater in the receiver side " << relPath << " "
<< fileSize << " " << it->second.getFileSize();
allocationStatus = EXISTS_TOO_LARGE;
prevSeqId = it->second.getSeqId();
} else {
auto &fileChunksInfo = it->second;
// Some portion of the file was sent in previous transfers. Receiver sends
// the list of chunks to the sender. Adding all the bytes of those chunks
// should give us the number of bytes saved due to incremental download
previouslySentBytes_ += fileChunksInfo.getTotalChunkSize();
remainingChunks = fileChunksInfo.getRemainingChunks(fileSize);
if (remainingChunks.empty()) {
WLOG(INFO) << relPath << " completely sent in previous transfer";
return;
}
seqId = fileChunksInfo.getSeqId();
allocationStatus = it->second.getFileSize() < fileSize
? EXISTS_TOO_SMALL
: EXISTS_CORRECT_SIZE;
}
metadata->seqId = seqId;
metadata->prevSeqId = prevSeqId;
metadata->allocationStatus = allocationStatus;
for (const auto &chunk : remainingChunks) {
int64_t offset = chunk.start_;
int64_t remainingBytes = chunk.size();
do {
const int64_t size = std::min<int64_t>(remainingBytes, blockSize);
std::unique_ptr<ByteSource> source =
std::make_unique<FileByteSource>(metadata, size, offset);
sourceQueue_.push(std::move(source));
remainingBytes -= size;
offset += size;
blockCount++;
} while (remainingBytes > 0);
totalFileSize_ += chunk.size();
}
numEntries_++;
numBlocks_ += blockCount;
smartNotify(blockCount);
}