void DirectorySourceQueue::createIntoQueueInternal()

in util/DirectorySourceQueue.cpp [480:556]


void DirectorySourceQueue::createIntoQueueInternal(SourceMetaData *metadata) {
  // TODO: currently we are treating small files(size less than blocksize) as
  // blocks. Also, we transfer file name in the header for all the blocks for a
  // large file. This can be optimized as follows -
  // a) if filesize < blocksize, we do not send blocksize and offset in the
  // header. This should be useful for tiny files(0-few hundred bytes). We will
  // have to use separate header format and commands for files and blocks.
  // b) if filesize > blocksize, we can use send filename only in the first
  // block and use a shorter header for subsequent blocks. Also, we can remove
  // block size once negotiated, since blocksize is sort of fixed.
  auto &fileSize = metadata->size;
  auto &relPath = metadata->relPath;
  int64_t blockSizeBytes = blockSizeMbytes_ * 1024 * 1024;
  bool enableBlockTransfer = blockSizeBytes > 0;
  if (!enableBlockTransfer) {
    WVLOG(2) << "Block transfer disabled for this transfer";
  }
  // if block transfer is disabled, treating fileSize as block size. This
  // ensures that we create a single block
  auto blockSize = enableBlockTransfer ? blockSizeBytes : fileSize;
  int blockCount = 0;
  std::vector<Interval> remainingChunks;
  int64_t seqId;
  FileAllocationStatus allocationStatus;
  int64_t prevSeqId = 0;
  auto it = previouslyTransferredChunks_.find(relPath);
  if (it == previouslyTransferredChunks_.end()) {
    // No previously transferred chunks
    remainingChunks.emplace_back(0, fileSize);
    seqId = nextSeqId_++;
    allocationStatus = NOT_EXISTS;
  } else if (it->second.getFileSize() > fileSize) {
    // file size is greater on the receiver side
    remainingChunks.emplace_back(0, fileSize);
    seqId = nextSeqId_++;
    WLOG(INFO) << "File size is greater in the receiver side " << relPath << " "
               << fileSize << " " << it->second.getFileSize();
    allocationStatus = EXISTS_TOO_LARGE;
    prevSeqId = it->second.getSeqId();
  } else {
    auto &fileChunksInfo = it->second;
    // Some portion of the file was sent in previous transfers. Receiver sends
    // the list of chunks to the sender. Adding all the bytes of those chunks
    // should give us the number of bytes saved due to incremental download
    previouslySentBytes_ += fileChunksInfo.getTotalChunkSize();
    remainingChunks = fileChunksInfo.getRemainingChunks(fileSize);
    if (remainingChunks.empty()) {
      WLOG(INFO) << relPath << " completely sent in previous transfer";
      return;
    }
    seqId = fileChunksInfo.getSeqId();
    allocationStatus = it->second.getFileSize() < fileSize
                           ? EXISTS_TOO_SMALL
                           : EXISTS_CORRECT_SIZE;
  }
  metadata->seqId = seqId;
  metadata->prevSeqId = prevSeqId;
  metadata->allocationStatus = allocationStatus;

  for (const auto &chunk : remainingChunks) {
    int64_t offset = chunk.start_;
    int64_t remainingBytes = chunk.size();
    do {
      const int64_t size = std::min<int64_t>(remainingBytes, blockSize);
      std::unique_ptr<ByteSource> source =
          std::make_unique<FileByteSource>(metadata, size, offset);
      sourceQueue_.push(std::move(source));
      remainingBytes -= size;
      offset += size;
      blockCount++;
    } while (remainingBytes > 0);
    totalFileSize_ += chunk.size();
  }
  numEntries_++;
  numBlocks_ += blockCount;
  smartNotify(blockCount);
}