void TransferLogManager::compactLog()

in util/TransferLogManager.cpp [612:693]


void TransferLogManager::compactLog() {
  auto fullLogPath = getFullPath(kWdtLogName);

  WLOG(INFO) << "Started compacting transfer log " << fullLogPath;

  if (fd_ < 0) {
    WLOG(ERROR) << "Failed to compact transfer log because log handle has "
                << "been closed";
    return;
  }

  // Shutdown writer thread to avoid race conditon against fd_
  shutdownThread();

  // Make sure log data is flushed.
  fsyncLog();

  if (::lseek(fd_, 0, SEEK_SET) < 0) {
    WPLOG(ERROR) << "lseek failed for fd " << fd_;
    return;
  }

  std::vector<FileChunksInfo> fileChunksInfoVec;
  auto code = parseAndMatch(recoveryId_, config_, fileChunksInfoVec);
  if (code != OK) {
    WLOG(ERROR) << "Failed to parse " << fullLogPath << " "
                << errorCodeToStr(code);
    return;
  }

  for (const auto &fileChunksInfo : fileChunksInfoVec) {
    // Found multiple chunks for a file.
    if (fileChunksInfo.getChunks().size() != 1) {
      WLOG(ERROR) << "File " << fileChunksInfo.getFileName()
                  << " has fragemented log entries";
      return;
    }

    if (fileChunksInfo.getTotalChunkSize() != fileChunksInfo.getFileSize()) {
      WLOG(ERROR) << "File " << fileChunksInfo.getFileName()
                  << " has mismatched total chunk size and file size";
      return;
    }
  }

  WLOG(INFO) << "Successfully verified transfer log integrity";

  if (::ftruncate(fd_, 0) != 0) {
    WPLOG(ERROR) << "ftruncate failed for fd " << fd_;
    return;
  }

  if (::lseek(fd_, 0, SEEK_SET) < 0) {
    WPLOG(ERROR) << "lseek failed for fd " << fd_;
    return;
  }

  writeLogHeader();

  for (const auto &fileChunksInfo : fileChunksInfoVec) {
    addFileCreationEntry(fileChunksInfo.getFileName(),
                         fileChunksInfo.getSeqId(),
                         fileChunksInfo.getFileSize());
    addBlockWriteEntry(fileChunksInfo.getSeqId(), 0,
                       fileChunksInfo.getFileSize());
  }

  std::vector<string> entries;

  {
    std::lock_guard<std::mutex> lock(mutex_);
    entries = entries_;
    entries_.clear();
  }

  bool writeSuccess = writeEntriesToDiskNoLock(entries);
  if (writeSuccess) {
    WLOG(INFO) << "Finished compacting transfer log";
  } else {
    WLOG(ERROR) << "Failed compacting transfer log";
  }
}