util/TransferLogManager.cpp (991 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <wdt/util/TransferLogManager.h>
#include <wdt/util/SerializationUtil.h>
#include <fcntl.h>
#include <folly/lang/Bits.h>
#include <folly/Conv.h>
#include <folly/Range.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <wdt/Reporting.h>
#include <ctime>
#include <iomanip>
using folly::ByteRange;
using std::string;
namespace facebook {
namespace wdt {
// TODO consider revamping this log format
const int TransferLogManager::WLOG_VERSION = 2;
int64_t LogEncoderDecoder::timestampInMicroseconds() const {
auto timestamp = Clock::now();
return std::chrono::duration_cast<std::chrono::microseconds>(
timestamp.time_since_epoch())
.count();
}
int64_t LogEncoderDecoder::encodeLogHeader(char *dest, int64_t max,
const string &recoveryId,
const string &senderIp,
int64_t config) {
// increment by 2 bytes to later store the total length
char *ptr = dest + sizeof(int16_t);
max -= sizeof(int16_t);
int64_t size = 0;
WDT_CHECK_GE(max, 1);
ptr[size++] = TransferLogManager::HEADER;
bool ok = encodeVarI64C(ptr, max, size, timestampInMicroseconds()) &&
encodeVarI64C(ptr, max, size, TransferLogManager::WLOG_VERSION) &&
encodeString(ptr, max, size, recoveryId) &&
encodeString(ptr, max, size, senderIp) &&
encodeVarI64C(ptr, max, size, config);
if (!ok) {
WLOG(ERROR) << "Log header buffer too small " << max << " for header "
<< recoveryId << " , " << senderIp;
return -1;
}
folly::storeUnaligned<int16_t>(dest, size);
return (size + sizeof(int16_t));
}
bool LogEncoderDecoder::decodeLogHeader(char *buf, int16_t size,
int64_t ×tamp, int &version,
string &recoveryId, string &senderIp,
int64_t &config) {
ByteRange br = makeByteRange(buf, size);
bool ok = decodeInt64C(br, timestamp) && decodeInt32C(br, version) &&
decodeString(br, recoveryId) && decodeString(br, senderIp) &&
decodeInt64C(br, config);
if (!ok || br.size() != 0) {
WLOG(ERROR) << "Error decoding log header size " << size << " ok " << ok
<< " left over " << br.size();
return false;
}
return ok;
}
int64_t LogEncoderDecoder::encodeFileCreationEntry(char *dest, int64_t max,
const string &fileName,
const int64_t seqId,
const int64_t fileSize) {
// increment by 2 bytes to later store the total length
int64_t size = sizeof(int16_t);
WDT_CHECK_GE(max, size + 1);
dest[size++] = TransferLogManager::FILE_CREATION;
bool ok = encodeVarI64C(dest, max, size, timestampInMicroseconds()) &&
encodeString(dest, max, size, fileName) &&
encodeVarI64C(dest, max, size, seqId) &&
encodeVarI64C(dest, max, size, fileSize);
if (!ok) {
WLOG(ERROR) << "Log header buffer too small " << max << " for file c entry "
<< fileName;
return -1;
}
folly::storeUnaligned<int16_t>(dest, size - sizeof(int16_t));
return size;
}
bool LogEncoderDecoder::decodeFileCreationEntry(char *buf, int16_t size,
int64_t ×tamp,
string &fileName,
int64_t &seqId,
int64_t &fileSize) {
ByteRange br = makeByteRange(buf, size);
bool ok = decodeInt64C(br, timestamp) && decodeString(br, fileName) &&
decodeInt64C(br, seqId) && decodeInt64C(br, fileSize);
if (!ok || (br.size() != 0)) {
WLOG(ERROR) << "Did not decode properly file creat entry " << size << " ok "
<< ok << " left over " << br.size();
return false;
}
return true;
}
int64_t LogEncoderDecoder::encodeBlockWriteEntry(char *dest, int64_t max,
const int64_t seqId,
const int64_t offset,
const int64_t blockSize) {
int64_t size = sizeof(int16_t);
WDT_CHECK_GE(max, size + 1);
dest[size++] = TransferLogManager::BLOCK_WRITE;
bool ok = encodeVarI64C(dest, max, size, timestampInMicroseconds()) &&
encodeVarI64C(dest, max, size, seqId) &&
encodeVarI64C(dest, max, size, offset) &&
encodeVarI64C(dest, max, size, blockSize);
if (!ok) {
WLOG(ERROR) << "Failed to encode blockwrite entry into " << max;
return -1;
}
folly::storeUnaligned<int16_t>(dest, size - sizeof(int16_t));
return size;
}
bool LogEncoderDecoder::decodeBlockWriteEntry(char *buf, int16_t size,
int64_t ×tamp,
int64_t &seqId, int64_t &offset,
int64_t &blockSize) {
ByteRange br = makeByteRange(buf, size);
bool ok = decodeInt64C(br, timestamp) && decodeInt64C(br, seqId) &&
decodeInt64C(br, offset) && decodeInt64C(br, blockSize);
if (!ok || (br.size() != 0)) {
WLOG(ERROR) << "Did not decode properly block write entry " << size
<< " ok " << ok << " left over " << br.size();
return false;
}
return true;
}
int64_t LogEncoderDecoder::encodeFileResizeEntry(char *dest, int64_t max,
const int64_t seqId,
const int64_t fileSize) {
int64_t size = sizeof(int16_t);
WDT_CHECK_GE(max, size + 1);
dest[size++] = TransferLogManager::FILE_RESIZE;
bool ok = encodeVarI64C(dest, max, size, timestampInMicroseconds()) &&
encodeVarI64C(dest, max, size, seqId) &&
encodeVarI64C(dest, max, size, fileSize);
if (!ok) {
return -1;
}
folly::storeUnaligned<int16_t>(dest, size - sizeof(int16_t));
return size;
}
bool LogEncoderDecoder::decodeFileResizeEntry(char *buf, int16_t size,
int64_t ×tamp,
int64_t &seqId,
int64_t &fileSize) {
ByteRange br = makeByteRange(buf, size);
bool ok = decodeInt64C(br, timestamp) && decodeInt64C(br, seqId) &&
decodeInt64C(br, fileSize);
if (!ok || (br.size() != 0)) {
WLOG(ERROR) << "Did not decode properly block write entry " << size
<< " ok " << ok << " left over " << br.size();
return false;
}
return true;
}
int64_t LogEncoderDecoder::encodeFileInvalidationEntry(char *dest, int64_t max,
const int64_t &seqId) {
int64_t size = sizeof(int16_t);
WDT_CHECK_GE(max, size + 1);
dest[size++] = TransferLogManager::FILE_INVALIDATION;
bool ok = encodeVarI64C(dest, max, size, timestampInMicroseconds()) &&
encodeVarI64C(dest, max, size, seqId);
if (!ok) {
WLOG(ERROR) << "Failed to encode inval entry into " << max;
return -1;
}
folly::storeUnaligned<int16_t>(dest, size - sizeof(int16_t));
return size;
}
bool LogEncoderDecoder::decodeFileInvalidationEntry(char *buf, int16_t size,
int64_t ×tamp,
int64_t &seqId) {
ByteRange br = makeByteRange(buf, size);
bool ok = decodeInt64C(br, timestamp) && decodeInt64C(br, seqId);
if (!ok || (br.size() != 0)) {
WLOG(ERROR) << "Did not decode properly file inval entry " << size << " ok "
<< ok << " left over " << br.size();
return false;
}
return true;
}
int64_t LogEncoderDecoder::encodeDirectoryInvalidationEntry(char *dest,
int64_t max) {
int64_t size = sizeof(int16_t);
WDT_CHECK_GE(max, size + 1);
dest[size++] = TransferLogManager::DIRECTORY_INVALIDATION;
if (!encodeVarI64C(dest, max, size, timestampInMicroseconds())) {
WLOG(ERROR) << "No room in " << max << " for dir inval entry";
return -1;
}
folly::storeUnaligned<int16_t>(dest, size - sizeof(int16_t));
return size;
}
// TODO make the caller make and pass the byterange
bool LogEncoderDecoder::decodeDirectoryInvalidationEntry(char *buf,
int16_t size,
int64_t ×tamp) {
ByteRange br = makeByteRange(buf, size);
bool ok = decodeInt64C(br, timestamp);
if (!ok || (br.size() != 0)) {
WLOG(ERROR) << "Did not decode properly dir inval entry " << size << " ok "
<< ok << " left over " << br.size();
return false;
}
return true;
}
string TransferLogManager::getFullPath(const string &relPath) {
WDT_CHECK(!rootDir_.empty()) << "Root directory not set";
string fullPath = rootDir_;
fullPath.append(relPath);
return fullPath;
}
ErrorCode TransferLogManager::openLog() {
WDT_CHECK(fd_ < 0);
WDT_CHECK(!rootDir_.empty()) << "Root directory not set";
WDT_CHECK(options_.enable_download_resumption);
const string logPath = getFullPath(kWdtLogName);
fd_ = ::open(logPath.c_str(), O_RDWR);
if (fd_ < 0) {
if (errno != ENOENT) {
WPLOG(ERROR) << "Could not open wdt log " << logPath;
return TRANSFER_LOG_ACQUIRE_ERROR;
} else {
// creation of the log path (which can still be a race)
WLOG(INFO) << logPath << " doesn't exist... creating...";
fd_ = ::open(logPath.c_str(), O_CREAT | O_EXCL, 0644);
if (fd_ < 0) {
WPLOG(WARNING) << "Could not create wdt log (maybe ok if race): "
<< logPath;
} else {
// On windows/cygwin for instance the flock will silently succeed yet
// not lock on a newly created file... workaround is to close and reopen
::close(fd_);
}
fd_ = ::open(logPath.c_str(), O_RDWR);
if (fd_ < 0) {
WPLOG(ERROR) << "Still couldn't open wdt log after create attempt: "
<< logPath;
return TRANSFER_LOG_ACQUIRE_ERROR;
}
}
}
// try to acquire file lock
if (::flock(fd_, LOCK_EX | LOCK_NB) != 0) {
WPLOG(ERROR) << "Failed to acquire transfer log lock " << logPath << " "
<< fd_;
close();
return TRANSFER_LOG_ACQUIRE_ERROR;
}
WLOG(INFO) << "Transfer log opened and lock acquired on " << logPath;
return OK;
}
ErrorCode TransferLogManager::startThread() {
if (!options_.resume_using_dir_tree) {
// start writer thread
if (resumptionStatus_ != OK) {
return resumptionStatus_;
}
writerThread_ =
std::thread(&TransferLogManager::threadProcWriteEntriesToDisk, this);
WLOG(INFO) << "Log writer thread started";
}
return OK;
}
void TransferLogManager::shutdownThread() {
if (writerThread_.joinable()) {
// stop writer thread
{
std::lock_guard<std::mutex> lock(mutex_);
finished_ = true;
}
conditionFinished_.notify_one();
writerThread_.join();
}
}
ErrorCode TransferLogManager::checkLog() {
if (fd_ < 0) {
WLOG(WARNING) << "No log to check";
return ERROR;
}
const string fullLogName = getFullPath(kWdtLogName);
struct stat stat1, stat2;
if (stat(fullLogName.c_str(), &stat1)) {
WPLOG(ERROR) << "CORRUPTION! Can't stat log file " << fullLogName
<< " (deleted under us)";
exit(TRANSFER_LOG_ACQUIRE_ERROR);
return ERROR;
}
if (fstat(fd_, &stat2)) {
WPLOG(ERROR) << "Unable to stat log by fd " << fd_;
exit(TRANSFER_LOG_ACQUIRE_ERROR);
return ERROR;
}
if (stat1.st_ino != stat2.st_ino) {
WLOG(ERROR) << "CORRUPTION! log file " << fullLogName << " changed "
<< " old/open fd inode " << stat2.st_ino << " on fs "
<< stat1.st_ino;
exit(TRANSFER_LOG_ACQUIRE_ERROR);
return ERROR;
}
WLOG(INFO) << fullLogName << " still ok with " << stat1.st_ino;
return OK;
}
void TransferLogManager::close() {
if (fd_ < 0) {
return;
}
checkLog();
if (::close(fd_) != 0) {
WPLOG(ERROR) << "Failed to close wdt log " << fd_;
} else {
WLOG(INFO) << "Transfer log closed";
}
fd_ = -1;
}
void TransferLogManager::closeLog() {
if (fd_ < 0) {
return;
}
if (!options_.resume_using_dir_tree) {
shutdownThread();
}
close();
}
TransferLogManager::~TransferLogManager() {
WDT_CHECK_LT(fd_, 0) << "Destructor called, but transfer log not closed";
}
bool TransferLogManager::writeEntriesToDiskNoLock(
const std::vector<std::string> &entries) {
string buffer;
// write entries to disk
for (const auto &entry : entries) {
buffer.append(entry);
}
if (buffer.empty()) {
// do not write when there is nothing to write
return true;
}
int64_t toWrite = buffer.size();
int64_t written = ::write(fd_, buffer.c_str(), toWrite);
if (written != toWrite) {
WPLOG(ERROR) << "Disk write error while writing transfer log " << written
<< " " << toWrite;
return false;
}
return true;
}
void TransferLogManager::threadProcWriteEntriesToDisk() {
WDT_CHECK(fd_ >= 0) << "Writer thread started before the log is opened";
WLOG(INFO) << "Transfer log writer thread started";
WDT_CHECK(options_.transfer_log_write_interval_ms >= 0);
auto waitingTime =
std::chrono::milliseconds(options_.transfer_log_write_interval_ms);
std::vector<string> entries;
bool finished = false;
while (!finished) {
{
std::unique_lock<std::mutex> lock(mutex_);
// conditionFinished_ will put writer thread to sleep. However, writer
// thread wakes up at interval of "watingTime" (in ms) to write log
// entries to disk. When "shutdownThread" is called, writer thread will
// wake up immediately and exit after writing current log entries to disk
conditionFinished_.wait_for(lock, waitingTime);
finished = finished_;
// make a copy of all the entries so that we do not need to hold lock
// during writing
entries = entries_;
entries_.clear();
}
if (!writeEntriesToDiskNoLock(entries)) {
return;
}
}
WLOG(INFO) << "Transfer log writer thread finished";
}
bool TransferLogManager::verifySenderIp(const string &curSenderIp) {
if (fd_ < 0) {
return false;
}
bool verifySuccessful = true;
if (!options_.disable_sender_verification_during_resumption) {
if (senderIp_.empty()) {
WLOG(INFO) << "Sender-ip empty, not verifying sender-ip, new-ip: "
<< curSenderIp;
} else if (senderIp_ != curSenderIp) {
WLOG(ERROR) << "Current sender ip does not match ip in the "
"transfer log "
<< curSenderIp << " " << senderIp_
<< ", ignoring transfer log";
verifySuccessful = false;
invalidateDirectory();
}
} else {
WLOG(WARNING) << "Sender-ip verification disabled " << senderIp_ << " "
<< curSenderIp;
}
senderIp_ = curSenderIp;
return verifySuccessful;
}
void TransferLogManager::fsyncLog() {
WDT_CHECK(fd_ >= 0);
if (::fsync(fd_) != 0) {
WPLOG(ERROR) << "fsync failed for transfer log " << fd_;
}
}
void TransferLogManager::invalidateDirectory() {
if (fd_ < 0) {
return;
}
WLOG(WARNING) << "Invalidating directory " << rootDir_;
resumptionStatus_ = INCONSISTENT_DIRECTORY;
char buf[kMaxEntryLength];
int64_t size =
encoderDecoder_.encodeDirectoryInvalidationEntry(buf, sizeof(buf));
int64_t written = ::write(fd_, buf, size);
if (written != size) {
WPLOG(ERROR)
<< "Disk write error while writing directory invalidation entry "
<< written << " " << size;
closeLog();
}
fsyncLog();
return;
}
void TransferLogManager::writeLogHeader() {
if (fd_ < 0) {
return;
}
WLOG(INFO) << "Writing log header, version: " << WLOG_VERSION
<< " recovery-id: " << recoveryId_ << " config: " << config_
<< " sender-ip: " << senderIp_;
char buf[kMaxEntryLength];
int64_t size = encoderDecoder_.encodeLogHeader(
buf, kMaxEntryLength, recoveryId_, senderIp_, config_);
int64_t written = ::write(fd_, buf, size);
if (written != size) {
WPLOG(ERROR) << "Disk write error while writing log header " << written
<< " " << size;
closeLog();
return;
}
fsyncLog();
// header signifies a valid directory state
resumptionStatus_ = OK;
headerWritten_ = true;
}
void TransferLogManager::addFileCreationEntry(const string &fileName,
int64_t seqId, int64_t fileSize) {
if (fd_ < 0 || !headerWritten_) {
return;
}
WVLOG(1) << "Adding file entry to log " << fileName << " " << seqId << " "
<< fileSize;
char buf[kMaxEntryLength];
int64_t size = encoderDecoder_.encodeFileCreationEntry(
buf, sizeof(buf), fileName, seqId, fileSize);
std::lock_guard<std::mutex> lock(mutex_);
entries_.emplace_back(buf, size);
}
void TransferLogManager::addBlockWriteEntry(int64_t seqId, int64_t offset,
int64_t blockSize) {
if (fd_ < 0 || !headerWritten_) {
return;
}
WVLOG(1) << "Adding block entry to log " << seqId << " " << offset << " "
<< blockSize;
char buf[kMaxEntryLength];
int64_t size = encoderDecoder_.encodeBlockWriteEntry(buf, sizeof(buf), seqId,
offset, blockSize);
std::lock_guard<std::mutex> lock(mutex_);
entries_.emplace_back(buf, size);
}
void TransferLogManager::addFileResizeEntry(int64_t seqId, int64_t fileSize) {
if (fd_ < 0 || !headerWritten_) {
return;
}
WVLOG(1) << "Adding file resize entry to log " << seqId << " " << fileSize;
char buf[kMaxEntryLength];
int64_t size =
encoderDecoder_.encodeFileResizeEntry(buf, sizeof(buf), seqId, fileSize);
std::lock_guard<std::mutex> lock(mutex_);
entries_.emplace_back(buf, size);
}
void TransferLogManager::addFileInvalidationEntry(int64_t seqId) {
if (fd_ < 0) {
return;
}
WLOG(INFO) << "Adding invalidation entry " << seqId;
char buf[kMaxEntryLength];
int64_t size =
encoderDecoder_.encodeFileInvalidationEntry(buf, kMaxEntryLength, seqId);
std::lock_guard<std::mutex> lock(mutex_);
entries_.emplace_back(buf, size);
}
void TransferLogManager::unlink() {
WDT_CHECK_LT(fd_, 0) << "Unlink called before closeLog!";
WLOG(INFO) << "unlinking " << kWdtLogName;
string fullLogName = getFullPath(kWdtLogName);
if (::unlink(fullLogName.c_str()) != 0) {
WPLOG(ERROR) << "Could not unlink " << fullLogName;
}
}
void TransferLogManager::renameBuggyLog() {
WDT_CHECK_LT(fd_, 0) << "renameBuggyLog called before closeLog!";
WLOG(INFO) << "Renaming " << kWdtLogName << " to " << kWdtBuggyLogName;
if (::rename(getFullPath(kWdtLogName).c_str(),
getFullPath(kWdtBuggyLogName).c_str()) != 0) {
WPLOG(ERROR) << "log rename failed " << kWdtLogName << " "
<< kWdtBuggyLogName;
}
return;
}
ErrorCode TransferLogManager::getResumptionStatus() {
return resumptionStatus_;
}
bool TransferLogManager::parseAndPrint() {
std::vector<FileChunksInfo> parsedInfo;
return parseVerifyAndFix("", 0, true, parsedInfo) == OK;
}
ErrorCode TransferLogManager::parseAndMatch(
const string &recoveryId, int64_t config,
std::vector<FileChunksInfo> &fileChunksInfo) {
recoveryId_ = recoveryId;
config_ = config;
return parseVerifyAndFix(recoveryId_, config, false, fileChunksInfo);
}
ErrorCode TransferLogManager::parseVerifyAndFix(
const string &recoveryId, int64_t config, bool parseOnly,
std::vector<FileChunksInfo> &parsedInfo) {
if (fd_ < 0) {
return INVALID_LOG;
}
LogParser parser(options_, encoderDecoder_, rootDir_, recoveryId, config,
parseOnly);
resumptionStatus_ = parser.parseLog(fd_, senderIp_, parsedInfo);
if (resumptionStatus_ == INVALID_LOG) {
// leave the log, but close it. Keeping the invalid log ensures that the
// directory remains invalid. Closing the log means that nothing else can
// be written to it again
closeLog();
} else if (resumptionStatus_ == INCONSISTENT_DIRECTORY) {
if (!parseOnly) {
// if we are only parsing, we should not modify anything
invalidateDirectory();
}
}
WLOG(INFO) << "Transfer log parsing finished "
<< errorCodeToStr(resumptionStatus_);
return resumptionStatus_;
}
void TransferLogManager::compactLog() {
auto fullLogPath = getFullPath(kWdtLogName);
WLOG(INFO) << "Started compacting transfer log " << fullLogPath;
if (fd_ < 0) {
WLOG(ERROR) << "Failed to compact transfer log because log handle has "
<< "been closed";
return;
}
// Shutdown writer thread to avoid race conditon against fd_
shutdownThread();
// Make sure log data is flushed.
fsyncLog();
if (::lseek(fd_, 0, SEEK_SET) < 0) {
WPLOG(ERROR) << "lseek failed for fd " << fd_;
return;
}
std::vector<FileChunksInfo> fileChunksInfoVec;
auto code = parseAndMatch(recoveryId_, config_, fileChunksInfoVec);
if (code != OK) {
WLOG(ERROR) << "Failed to parse " << fullLogPath << " "
<< errorCodeToStr(code);
return;
}
for (const auto &fileChunksInfo : fileChunksInfoVec) {
// Found multiple chunks for a file.
if (fileChunksInfo.getChunks().size() != 1) {
WLOG(ERROR) << "File " << fileChunksInfo.getFileName()
<< " has fragemented log entries";
return;
}
if (fileChunksInfo.getTotalChunkSize() != fileChunksInfo.getFileSize()) {
WLOG(ERROR) << "File " << fileChunksInfo.getFileName()
<< " has mismatched total chunk size and file size";
return;
}
}
WLOG(INFO) << "Successfully verified transfer log integrity";
if (::ftruncate(fd_, 0) != 0) {
WPLOG(ERROR) << "ftruncate failed for fd " << fd_;
return;
}
if (::lseek(fd_, 0, SEEK_SET) < 0) {
WPLOG(ERROR) << "lseek failed for fd " << fd_;
return;
}
writeLogHeader();
for (const auto &fileChunksInfo : fileChunksInfoVec) {
addFileCreationEntry(fileChunksInfo.getFileName(),
fileChunksInfo.getSeqId(),
fileChunksInfo.getFileSize());
addBlockWriteEntry(fileChunksInfo.getSeqId(), 0,
fileChunksInfo.getFileSize());
}
std::vector<string> entries;
{
std::lock_guard<std::mutex> lock(mutex_);
entries = entries_;
entries_.clear();
}
bool writeSuccess = writeEntriesToDiskNoLock(entries);
if (writeSuccess) {
WLOG(INFO) << "Finished compacting transfer log";
} else {
WLOG(ERROR) << "Failed compacting transfer log";
}
}
LogParser::LogParser(const WdtOptions &options,
LogEncoderDecoder &encoderDecoder, const string &rootDir,
const string &recoveryId, int64_t config, bool parseOnly)
: options_(options),
encoderDecoder_(encoderDecoder),
rootDir_(rootDir),
recoveryId_(recoveryId),
config_(config),
parseOnly_(parseOnly) {
}
bool LogParser::writeFileInvalidationEntries(int fd,
const std::set<int64_t> &seqIds) {
char buf[TransferLogManager::kMaxEntryLength];
for (auto seqId : seqIds) {
int64_t size =
encoderDecoder_.encodeFileInvalidationEntry(buf, sizeof(buf), seqId);
int64_t written = ::write(fd, buf, size);
if (written != size) {
WPLOG(ERROR) << "Disk write error while writing invalidation entry to "
"transfer log "
<< written << " " << size;
return false;
}
}
return true;
}
bool LogParser::truncateExtraBytesAtEnd(int fd, int64_t extraBytes) {
WLOG(INFO) << "Removing extra " << extraBytes
<< " bytes from the end of transfer log";
struct stat statBuffer;
if (fstat(fd, &statBuffer) != 0) {
WPLOG(ERROR) << "fstat failed on fd " << fd;
return false;
}
off_t fileSize = statBuffer.st_size;
if (::ftruncate(fd, fileSize - extraBytes) != 0) {
WPLOG(ERROR) << "ftruncate failed for fd " << fd;
return false;
}
// ftruncate does not change the offset, so change the offset to the end of
// the log
if (::lseek(fd, fileSize - extraBytes, SEEK_SET) < 0) {
WPLOG(ERROR) << "lseek failed for fd " << fd;
return false;
}
return true;
}
void LogParser::clearParsedData() {
fileInfoMap_.clear();
seqIdToSizeMap_.clear();
invalidSeqIds_.clear();
}
string LogParser::getFormattedTimestamp(int64_t timestampMicros) {
// This assumes Clock's epoch is Posix's epoch (1970/1/1)
// to_time_t is unfortunately only on the system_clock and not
// on high_resolution_clock (on MacOS at least it isn't)
time_t seconds = timestampMicros / kMicroToSec;
int microseconds = timestampMicros - seconds * kMicroToSec;
// need 25 bytes to encode date in format mm/dd/yy HH:MM:SS.MMMMMM
char buf[25];
struct tm tm;
localtime_r(&seconds, &tm);
snprintf(buf, sizeof(buf), "%02d/%02d/%02d %02d:%02d:%02d.%06d",
tm.tm_mon + 1, tm.tm_mday, (tm.tm_year % 100), tm.tm_hour, tm.tm_min,
tm.tm_sec, microseconds);
return buf;
}
ErrorCode LogParser::processHeaderEntry(char *buf, int64_t max, int64_t size,
string &senderIp) {
if (size > max) {
WLOG(ERROR) << "Bad size " << size << " vs max " << max;
return INVALID_LOG;
}
int64_t timestamp;
int logVersion;
string logRecoveryId;
int64_t logConfig;
if (!encoderDecoder_.decodeLogHeader(buf, size, timestamp, logVersion,
logRecoveryId, senderIp, logConfig)) {
WLOG(ERROR) << "Couldn't decode the log header";
return INVALID_LOG;
}
if (logVersion != TransferLogManager::WLOG_VERSION) {
WLOG(ERROR) << "Can not parse log version " << logVersion
<< ", parser version " << TransferLogManager::WLOG_VERSION;
return INVALID_LOG;
}
if (senderIp.empty()) {
WLOG(ERROR) << "Log header has empty sender ip";
return INVALID_LOG;
}
if (parseOnly_) {
std::cout << getFormattedTimestamp(timestamp)
<< " Header entry, log-version " << logVersion << " recovery-id "
<< logRecoveryId << " sender-ip " << senderIp << " config "
<< logConfig << std::endl;
// we do not perform verifications for parse only mode
headerParsed_ = true;
return OK;
}
if (recoveryId_ != logRecoveryId) {
WLOG(ERROR) << "Current recovery-id does not match with log recovery-id "
<< recoveryId_ << " " << logRecoveryId;
return INCONSISTENT_DIRECTORY;
}
if (config_ != logConfig) {
WLOG(ERROR) << "Current config does not match with log config " << config_
<< " " << logConfig;
return INCONSISTENT_DIRECTORY;
}
headerParsed_ = true;
return OK;
}
ErrorCode LogParser::processFileCreationEntry(char *buf, int64_t size) {
if (!headerParsed_) {
WLOG(ERROR)
<< "Invalid log: File creation entry found before transfer log header";
return INVALID_LOG;
}
int64_t timestamp, seqId, fileSize;
string fileName;
if (!encoderDecoder_.decodeFileCreationEntry(buf, size, timestamp, fileName,
seqId, fileSize)) {
return INVALID_LOG;
}
if (parseOnly_) {
std::cout << getFormattedTimestamp(timestamp) << " File created "
<< fileName << " seq-id " << seqId << " file-size " << fileSize
<< std::endl;
return OK;
}
if (options_.resume_using_dir_tree) {
WLOG(ERROR) << "Can not have a file creation entry in directory based "
"resumption mode "
<< fileName << " " << seqId << " " << fileSize;
return INVALID_LOG;
}
if (fileInfoMap_.find(seqId) != fileInfoMap_.end() ||
invalidSeqIds_.find(seqId) != invalidSeqIds_.end()) {
WLOG(ERROR) << "Multiple FILE_CREATION entry for same sequence-id "
<< fileName << " " << seqId << " " << fileSize;
return INVALID_LOG;
}
// verify size
bool sizeVerificationSuccess = false;
struct stat buffer;
string fullPath;
folly::toAppend(rootDir_, fileName, &fullPath);
if (stat(fullPath.c_str(), &buffer) != 0) {
WPLOG(ERROR) << "stat failed for " << fileName;
} else {
if (options_.shouldPreallocateFiles()) {
sizeVerificationSuccess = (buffer.st_size >= fileSize);
} else {
sizeVerificationSuccess = true;
}
}
if (sizeVerificationSuccess) {
fileInfoMap_.emplace(seqId,
FileChunksInfo(seqId, fileName, buffer.st_size));
seqIdToSizeMap_.emplace(seqId, fileSize);
} else {
WLOG(INFO) << "Sanity check failed for " << fileName << " seq-id " << seqId
<< " file-size " << fileSize;
invalidSeqIds_.insert(seqId);
}
return OK;
}
ErrorCode LogParser::processFileResizeEntry(char *buf, int64_t size) {
if (!headerParsed_) {
WLOG(ERROR)
<< "Invalid log: File resize entry found before transfer log header";
return INVALID_LOG;
}
int64_t timestamp, seqId, fileSize;
if (!encoderDecoder_.decodeFileResizeEntry(buf, size, timestamp, seqId,
fileSize)) {
return INVALID_LOG;
}
if (parseOnly_) {
std::cout << getFormattedTimestamp(timestamp) << " File resized,"
<< " seq-id " << seqId << " new file-size " << fileSize
<< std::endl;
return OK;
}
if (options_.resume_using_dir_tree) {
WLOG(ERROR) << "Can not have a file resize entry in directory based "
"resumption mode "
<< seqId << " " << fileSize;
return INVALID_LOG;
}
auto it = fileInfoMap_.find(seqId);
if (it == fileInfoMap_.end()) {
WLOG(ERROR) << "File resize entry for unknown sequence-id " << seqId << " "
<< fileSize;
return INVALID_LOG;
}
FileChunksInfo &chunksInfo = it->second;
const string &fileName = chunksInfo.getFileName();
auto sizeIt = seqIdToSizeMap_.find(seqId);
WDT_CHECK(sizeIt != seqIdToSizeMap_.end());
if (fileSize < sizeIt->second) {
WLOG(ERROR) << "File size can not reduce during resizing " << fileName
<< " " << seqId << " " << fileSize << " " << sizeIt->second;
return INVALID_LOG;
}
if (options_.shouldPreallocateFiles() &&
fileSize > chunksInfo.getFileSize()) {
WLOG(ERROR) << "Size on the disk is less than the resized size for "
<< fileName << " seq-id " << seqId << " disk-size "
<< chunksInfo.getFileSize() << " resized-size " << fileSize;
return INVALID_LOG;
}
sizeIt->second = fileSize;
return OK;
}
ErrorCode LogParser::processBlockWriteEntry(char *buf, int64_t size) {
if (!headerParsed_) {
WLOG(ERROR)
<< "Invalid log: Block write entry found before transfer log header";
return INVALID_LOG;
}
int64_t timestamp, seqId, offset, blockSize;
if (!encoderDecoder_.decodeBlockWriteEntry(buf, size, timestamp, seqId,
offset, blockSize)) {
return INVALID_LOG;
}
if (parseOnly_) {
std::cout << getFormattedTimestamp(timestamp) << " Block written,"
<< " seq-id " << seqId << " offset " << offset << " block-size "
<< blockSize << std::endl;
return OK;
}
if (options_.resume_using_dir_tree) {
WLOG(ERROR) << "Can not have a block write entry in directory based "
"resumption mode "
<< seqId << " " << offset << " " << blockSize;
return INVALID_LOG;
}
if (invalidSeqIds_.find(seqId) != invalidSeqIds_.end()) {
WLOG(INFO) << "Block entry for an invalid sequence-id " << seqId
<< ", ignoring";
return OK;
}
auto it = fileInfoMap_.find(seqId);
if (it == fileInfoMap_.end()) {
WLOG(ERROR) << "Block entry for unknown sequence-id " << seqId << " "
<< offset << " " << blockSize;
return INVALID_LOG;
}
FileChunksInfo &chunksInfo = it->second;
// check whether the block is within disk size
if (offset + blockSize > chunksInfo.getFileSize()) {
WLOG(ERROR) << "Block end point is greater than file size in disk "
<< chunksInfo.getFileName() << " seq-id " << seqId << " offset "
<< offset << " block-size " << blockSize
<< " file size in disk " << chunksInfo.getFileSize();
return INVALID_LOG;
}
chunksInfo.addChunk(Interval(offset, offset + blockSize));
return OK;
}
ErrorCode LogParser::processFileInvalidationEntry(char *buf, int64_t size) {
if (!headerParsed_) {
WLOG(ERROR) << "Invalid log: File invalidation entry found before transfer "
"log header";
return INVALID_LOG;
}
int64_t timestamp, seqId;
if (!encoderDecoder_.decodeFileInvalidationEntry(buf, size, timestamp,
seqId)) {
return INVALID_LOG;
}
if (parseOnly_) {
std::cout << getFormattedTimestamp(timestamp)
<< " Invalidation entry for seq-id " << seqId << std::endl;
return OK;
}
if (options_.resume_using_dir_tree) {
WLOG(ERROR) << "Can not have a file invalidation entry in directory based "
"resumption mode "
<< seqId;
return INVALID_LOG;
}
if (fileInfoMap_.find(seqId) == fileInfoMap_.end() &&
invalidSeqIds_.find(seqId) == invalidSeqIds_.end()) {
WLOG(ERROR) << "Invalidation entry for an unknown sequence id " << seqId;
return INVALID_LOG;
}
fileInfoMap_.erase(seqId);
invalidSeqIds_.erase(seqId);
return OK;
}
ErrorCode LogParser::processDirectoryInvalidationEntry(char *buf,
int64_t size) {
int64_t timestamp;
if (!encoderDecoder_.decodeDirectoryInvalidationEntry(buf, size, timestamp)) {
return INVALID_LOG;
}
if (parseOnly_) {
std::cout << getFormattedTimestamp(timestamp) << " Directory invalidated"
<< std::endl;
return OK;
}
headerParsed_ = false;
return INCONSISTENT_DIRECTORY;
}
ErrorCode LogParser::parseLog(int fd, string &senderIp,
std::vector<FileChunksInfo> &fileChunksInfo) {
char entry[TransferLogManager::kMaxEntryLength];
// empty log is valid
ErrorCode status = OK;
while (true) {
int16_t entrySize;
int64_t toRead = sizeof(int16_t);
int64_t numRead = ::read(fd, &entrySize, toRead);
if (numRead < 0) {
WPLOG(ERROR) << "Error while reading transfer log " << numRead << " "
<< toRead;
return INVALID_LOG;
}
if (numRead == 0) {
WVLOG(1) << "got EOF, toRead " << toRead;
break;
}
if (numRead != toRead) {
// extra bytes at the end, most likely part of the previous write
// succeeded partially
if (parseOnly_) {
WLOG(INFO) << "Extra " << numRead << " bytes at the end of the log";
} else if (!truncateExtraBytesAtEnd(fd, numRead)) {
return INVALID_LOG;
}
break;
}
if (entrySize <= 0 || entrySize > TransferLogManager::kMaxEntryLength) {
WLOG(ERROR) << "Transfer log parse error, invalid entry length "
<< entrySize;
return INVALID_LOG;
}
numRead = ::read(fd, entry, entrySize);
if (numRead < 0) {
WPLOG(ERROR) << "Error while reading transfer log " << numRead << " "
<< entrySize;
return INVALID_LOG;
}
if (numRead != entrySize) {
// extra bytes also includes the size entry
int64_t extraBytes = numRead + sizeof(int16_t);
if (parseOnly_) {
WLOG(INFO) << "Extra " << extraBytes << " bytes at the end of the log";
} else if (!truncateExtraBytesAtEnd(fd, extraBytes)) {
return INVALID_LOG;
}
break;
}
TransferLogManager::EntryType type =
(TransferLogManager::EntryType)entry[0];
if (status == INCONSISTENT_DIRECTORY &&
type != TransferLogManager::HEADER) {
// If the directory is invalid, no need to process any entry other than
// header, because only a header can validate a directory
continue;
}
char *buf = entry + 1;
const int64_t bufSize = sizeof(entry) - 1;
const int64_t entryLen = entrySize - 1;
switch (type) {
case TransferLogManager::HEADER:
status = processHeaderEntry(buf, bufSize, entryLen, senderIp);
break;
case TransferLogManager::FILE_CREATION:
status = processFileCreationEntry(buf, entryLen);
break;
case TransferLogManager::BLOCK_WRITE:
status = processBlockWriteEntry(buf, entryLen);
break;
case TransferLogManager::FILE_RESIZE:
status = processFileResizeEntry(buf, entryLen);
break;
case TransferLogManager::FILE_INVALIDATION:
status = processFileInvalidationEntry(buf, entryLen);
break;
case TransferLogManager::DIRECTORY_INVALIDATION:
status = processDirectoryInvalidationEntry(buf, entryLen);
break;
default:
WLOG(ERROR) << "Invalid entry type found " << type;
return INVALID_LOG;
}
if (status == INVALID_LOG) {
WLOG(ERROR) << "Invalid transfer log";
return status;
}
if (status == INCONSISTENT_DIRECTORY) {
clearParsedData();
}
}
if (status == OK) {
for (auto &pair : fileInfoMap_) {
FileChunksInfo &fileInfo = pair.second;
fileInfo.mergeChunks();
fileChunksInfo.emplace_back(std::move(fileInfo));
}
if (!invalidSeqIds_.empty()) {
if (!writeFileInvalidationEntries(fd, invalidSeqIds_)) {
return INVALID_LOG;
}
}
}
return status;
}
}
}