util/TransferLogManager.h (143 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <wdt/Protocol.h>
#include <wdt/WdtOptions.h>
#include <condition_variable>
#include <iostream>
#include <map>
#include <mutex>
#include <set>
#include <string>
#include <thread>
namespace facebook {
namespace wdt {
/**
* Download Resumption in WDT:
* WDT can resume download in two modes.
*
* 1) Log based resumption : In this mode, WDT writes all important events like
* start of a new transfer, file creation, file resize, block write to .wdt.log
* in the destination directory. Before the transfer is started, this log is
* parsed and a list of FileChunksInfo is created. This information is passed to
* the sender which does a diff and sends only the missing portions.
*
* 2) Size based resumption : In this mode, WDT traverses the destination
* directory and trusts the size of the file. Sender only sends the extra
* portions of the file. Before traversing the directory .wdt.log is checked to
* see if previous transfers were also done in this mode or not. If the mode has
* changed, we add an INCONSISTENT_DIRECTORY entry to the log and transfer
* everything. This mode enabled by setting resume_using_dir_tree option to
* true.
*/
constexpr char kWdtLogName[] = ".wdt.log";
constexpr char kWdtBuggyLogName[] = ".wdt.log.bug";
/**
* class responsible for encoding and decoding transfer log entries
*/
class LogEncoderDecoder {
public:
/// encodes header entry
int64_t encodeLogHeader(char *dest, int64_t max,
const std::string &recoveryId,
const std::string &senderIp, int64_t config);
/// decodes header entry
bool decodeLogHeader(char *buf, int16_t size, int64_t ×tamp,
int &version, std::string &recoveryId,
std::string &senderIp, int64_t &config);
/// encodes file creation entry
int64_t encodeFileCreationEntry(char *dest, int64_t max,
const std::string &fileName,
const int64_t seqId, const int64_t fileSize);
/// decodes file creation entry
bool decodeFileCreationEntry(char *buf, int16_t size, int64_t ×tamp,
std::string &fileName, int64_t &seqId,
int64_t &fileSize);
/// encodes block write entry
int64_t encodeBlockWriteEntry(char *dest, int64_t max, const int64_t seqId,
const int64_t offset, const int64_t blockSize);
/// decodes block write entry
bool decodeBlockWriteEntry(char *buf, int16_t size, int64_t ×tamp,
int64_t &seqId, int64_t &offset,
int64_t &blockSize);
/// encodes file resize entry
int64_t encodeFileResizeEntry(char *dest, int64_t max, const int64_t seqId,
const int64_t fileSize);
/// decodes file resize entry
bool decodeFileResizeEntry(char *buf, int16_t size, int64_t ×tamp,
int64_t &seqId, int64_t &fileSize);
/// encodes file invalidation entry
int64_t encodeFileInvalidationEntry(char *dest, int64_t max,
const int64_t &seqId);
/// decodes file invalidation entry
bool decodeFileInvalidationEntry(char *buf, int16_t size, int64_t ×tamp,
int64_t &seqId);
/// encodes directory invalidation entry
int64_t encodeDirectoryInvalidationEntry(char *buf, int64_t max);
/// decodes directory invalidation entry
bool decodeDirectoryInvalidationEntry(char *buf, int16_t size,
int64_t ×tamp);
private:
/// @return timestamp in microseconds
int64_t timestampInMicroseconds() const;
};
/**
* This class manages reads and writes to receiver side transfer log. This
* class buffers writes to transfer log and starts a writer thread to
* periodically write log entries to the disk. This also has a function to read
* all the entries and correct the log if needed.
*/
class TransferLogManager {
public:
const static int WLOG_VERSION;
enum EntryType {
HEADER, // log header
FILE_CREATION, // File created and space allocated
BLOCK_WRITE, // Complete block fsynced to disk
FILE_INVALIDATION, // Missing file
FILE_RESIZE, // File Resized
DIRECTORY_INVALIDATION, // Directory content is invalid
};
/// 2 bytes for entry size, 1 byte for entry-type, PATH_MAX for file-name, 10
/// bytes for seq-id, 10 bytes for file-size, 10 bytes for timestamp
static const int64_t kMaxEntryLength = 2 + 1 + 10 + PATH_MAX + 2 * 10;
TransferLogManager(const WdtOptions &options, const std::string &rootDir)
: options_(options) {
rootDir_ = rootDir;
if (rootDir_.back() != '/') {
rootDir_.push_back('/');
}
}
/**
* Opens the log for reading and writing. In case of log based
* resumption, starts writer thread. If the log is not present, a new one is
* created
*/
ErrorCode openLog();
/// Start the log writer thread
ErrorCode startThread();
/**
* In case of log based resumption, signals to the writer thread to finish.
* Waits for the writer thread to finish. Closes the transfer log.
*/
void closeLog();
/**
* Verifies sender ip
*
* @param curSenderIp current sender ip
*
* @return whether sender-ip matched log ip
*/
bool verifySenderIp(const std::string &curSenderIp);
/**
* If resumption is based on directory tree, writes log header to the
* transfer log. Else, adds a file creation entry to the log buffer
*/
void writeLogHeader();
/**
* Adds a file creation entry to the log buffer
*
* @param fileName Name of the file
* @param seqId seq-id of the file
* @param fileSize size of the file
*/
void addFileCreationEntry(const std::string &fileName, int64_t seqId,
int64_t fileSize);
/**
* Adds a block write entry to the log buffer
*
* @param seqId seq-id of the file
* @param offset block offset
* @param blockSize size of the block
*/
void addBlockWriteEntry(int64_t seqId, int64_t offset, int64_t blockSize);
/**
* Adds a file resize entry to the log buffer
*
* @param seqId seq-id of the file
* @param fileSize size of the file
*/
void addFileResizeEntry(int64_t seqId, int64_t fileSize);
/**
* Adds an invalidation entry to the log buffer
*
* @param seqId seq-id of the file
*/
void addFileInvalidationEntry(int64_t seqId);
/// Writes an invalidation entry for the directory to the transfer log.
void invalidateDirectory();
/**
* parses the transfer log and prints entries
*
* @return Whether the log is valid or not
*/
bool parseAndPrint();
/// renames .wdt.log to .wdt.log.bug
void renameBuggyLog();
/// Compacts transfer log
void compactLog();
/**
* parses transfer log, does validation and fixes the log in case of partial
* writes from previous transfer. Also parsed info is cached for later use and
* can be accessed through getParsedFileChunksInfo
*
* @param recoveryId recovery-id of the current transfer
* @param config transfer config encoded as int
* @param fileChunksInfo this vector is populated with parsed chunks info
*
* @return status of the parsing
*/
ErrorCode parseAndMatch(const std::string &recoveryId, int64_t config,
std::vector<FileChunksInfo> &fileChunksInfo);
/// @return returns current resumption status
ErrorCode getResumptionStatus();
/**
* Unlinks wdt transfer log
*/
void unlink();
/// destructor
~TransferLogManager();
private:
/// Shutdown the log writer thread
void shutdownThread();
std::string getFullPath(const std::string &relPath);
/**
* entry point for the writer thread. This thread periodically writes buffer
* contents to disk
*/
void threadProcWriteEntriesToDisk();
/// Write 'entries' to disk synchronously
/// return true if entries are written successfully, otherwise false.
bool writeEntriesToDiskNoLock(const std::vector<std::string> &entries);
/// fsync transfer log
void fsyncLog();
/// Check log or directory hasn't been removed under us
/// TODO: consider calling periodically from the thread for early warning
ErrorCode checkLog();
/// closes transfer log
void close();
/**
* Parses the transfer log. Verifies if all the file exists or not(This is
* done to verify whether directory entries were synced to disk before or
* not). Also writes invalidation entries for files with verification failure.
*
* @param recoveryId recovery-id, this is verified against the logged
* recovery-id
* @param config config of the current transfer
* @param parseOnly If true, all parsed entries are logged, and the
* log is not modified or verified
* @param parsedInfo vector to populate with parsed data, only
* populated if parseOnly is false
*
* @return Log parsing status
*/
ErrorCode parseVerifyAndFix(const std::string &recoveryId, int64_t config,
bool parseOnly,
std::vector<FileChunksInfo> &parsedInfo);
LogEncoderDecoder encoderDecoder_;
const WdtOptions &options_;
/// File handler for writing
int fd_{-1};
/// root directory
std::string rootDir_;
/// recovery id
std::string recoveryId_;
/// sender ip
std::string senderIp_;
/// transfer config
int64_t config_;
/// Entry buffer
std::vector<std::string> entries_;
/// Flag to signal end to the writer thread
bool finished_{false};
/// current resumption status
ErrorCode resumptionStatus_;
/// Whether the header is written or not. If it is not written, that means the
/// sender is has not asked for file chunks, which means that sender does not
/// want to resume. If this is false, file-creation, block-write and
/// file-resize entries are not written.
bool headerWritten_{false};
/// Writer thread
std::thread writerThread_;
std::mutex mutex_;
std::condition_variable conditionFinished_;
};
/// class responsible for parsing and fixing transfer log
class LogParser {
public:
LogParser(const WdtOptions &options, LogEncoderDecoder &encoderDecoder,
const std::string &rootDir, const std::string &recoveryId,
int64_t config, bool parseOnly);
ErrorCode parseLog(int fd, std::string &senderIp,
std::vector<FileChunksInfo> &fileChunksInfo);
private:
std::string getFormattedTimestamp(int64_t timestamp);
void clearParsedData();
/**
* Truncates the log
*
* @param fd file descriptor
* @param extraBytes extra bytes at the end of the file
*
* @return If successful, true, else false
*/
bool truncateExtraBytesAtEnd(int fd, int64_t extraBytes);
/**
* writes invalidation entries to the disk.
*
* @param seqIds Invalid seq-ids
*/
bool writeFileInvalidationEntries(int fd, const std::set<int64_t> &seqIds);
ErrorCode processHeaderEntry(char *buf, int64_t max, int64_t size,
std::string &senderIp);
// TODO: switch to ByteRange
ErrorCode processFileCreationEntry(char *buf, int64_t size);
ErrorCode processBlockWriteEntry(char *buf, int64_t size);
ErrorCode processFileResizeEntry(char *buf, int64_t size);
ErrorCode processFileInvalidationEntry(char *buf, int64_t size);
ErrorCode processDirectoryInvalidationEntry(char *buf, int64_t size);
const WdtOptions &options_;
LogEncoderDecoder &encoderDecoder_;
std::string rootDir_;
std::string recoveryId_;
int64_t config_;
bool parseOnly_;
/// whether header is parsed or not
bool headerParsed_{false};
/// seq-id to chunks map
std::map<int64_t, FileChunksInfo> fileInfoMap_;
/// seq-id to file-size map
std::map<int64_t, int64_t> seqIdToSizeMap_;
/// set of invalid seq-ids
std::set<int64_t> invalidSeqIds_;
};
}
}