in util/TransferLogManager.cpp [612:693]
void TransferLogManager::compactLog() {
auto fullLogPath = getFullPath(kWdtLogName);
WLOG(INFO) << "Started compacting transfer log " << fullLogPath;
if (fd_ < 0) {
WLOG(ERROR) << "Failed to compact transfer log because log handle has "
<< "been closed";
return;
}
// Shutdown writer thread to avoid race conditon against fd_
shutdownThread();
// Make sure log data is flushed.
fsyncLog();
if (::lseek(fd_, 0, SEEK_SET) < 0) {
WPLOG(ERROR) << "lseek failed for fd " << fd_;
return;
}
std::vector<FileChunksInfo> fileChunksInfoVec;
auto code = parseAndMatch(recoveryId_, config_, fileChunksInfoVec);
if (code != OK) {
WLOG(ERROR) << "Failed to parse " << fullLogPath << " "
<< errorCodeToStr(code);
return;
}
for (const auto &fileChunksInfo : fileChunksInfoVec) {
// Found multiple chunks for a file.
if (fileChunksInfo.getChunks().size() != 1) {
WLOG(ERROR) << "File " << fileChunksInfo.getFileName()
<< " has fragemented log entries";
return;
}
if (fileChunksInfo.getTotalChunkSize() != fileChunksInfo.getFileSize()) {
WLOG(ERROR) << "File " << fileChunksInfo.getFileName()
<< " has mismatched total chunk size and file size";
return;
}
}
WLOG(INFO) << "Successfully verified transfer log integrity";
if (::ftruncate(fd_, 0) != 0) {
WPLOG(ERROR) << "ftruncate failed for fd " << fd_;
return;
}
if (::lseek(fd_, 0, SEEK_SET) < 0) {
WPLOG(ERROR) << "lseek failed for fd " << fd_;
return;
}
writeLogHeader();
for (const auto &fileChunksInfo : fileChunksInfoVec) {
addFileCreationEntry(fileChunksInfo.getFileName(),
fileChunksInfo.getSeqId(),
fileChunksInfo.getFileSize());
addBlockWriteEntry(fileChunksInfo.getSeqId(), 0,
fileChunksInfo.getFileSize());
}
std::vector<string> entries;
{
std::lock_guard<std::mutex> lock(mutex_);
entries = entries_;
entries_.clear();
}
bool writeSuccess = writeEntriesToDiskNoLock(entries);
if (writeSuccess) {
WLOG(INFO) << "Finished compacting transfer log";
} else {
WLOG(ERROR) << "Failed compacting transfer log";
}
}