in util/TransferLogManager.cpp [1015:1119]
ErrorCode LogParser::parseLog(int fd, string &senderIp,
std::vector<FileChunksInfo> &fileChunksInfo) {
char entry[TransferLogManager::kMaxEntryLength];
// empty log is valid
ErrorCode status = OK;
while (true) {
int16_t entrySize;
int64_t toRead = sizeof(int16_t);
int64_t numRead = ::read(fd, &entrySize, toRead);
if (numRead < 0) {
WPLOG(ERROR) << "Error while reading transfer log " << numRead << " "
<< toRead;
return INVALID_LOG;
}
if (numRead == 0) {
WVLOG(1) << "got EOF, toRead " << toRead;
break;
}
if (numRead != toRead) {
// extra bytes at the end, most likely part of the previous write
// succeeded partially
if (parseOnly_) {
WLOG(INFO) << "Extra " << numRead << " bytes at the end of the log";
} else if (!truncateExtraBytesAtEnd(fd, numRead)) {
return INVALID_LOG;
}
break;
}
if (entrySize <= 0 || entrySize > TransferLogManager::kMaxEntryLength) {
WLOG(ERROR) << "Transfer log parse error, invalid entry length "
<< entrySize;
return INVALID_LOG;
}
numRead = ::read(fd, entry, entrySize);
if (numRead < 0) {
WPLOG(ERROR) << "Error while reading transfer log " << numRead << " "
<< entrySize;
return INVALID_LOG;
}
if (numRead != entrySize) {
// extra bytes also includes the size entry
int64_t extraBytes = numRead + sizeof(int16_t);
if (parseOnly_) {
WLOG(INFO) << "Extra " << extraBytes << " bytes at the end of the log";
} else if (!truncateExtraBytesAtEnd(fd, extraBytes)) {
return INVALID_LOG;
}
break;
}
TransferLogManager::EntryType type =
(TransferLogManager::EntryType)entry[0];
if (status == INCONSISTENT_DIRECTORY &&
type != TransferLogManager::HEADER) {
// If the directory is invalid, no need to process any entry other than
// header, because only a header can validate a directory
continue;
}
char *buf = entry + 1;
const int64_t bufSize = sizeof(entry) - 1;
const int64_t entryLen = entrySize - 1;
switch (type) {
case TransferLogManager::HEADER:
status = processHeaderEntry(buf, bufSize, entryLen, senderIp);
break;
case TransferLogManager::FILE_CREATION:
status = processFileCreationEntry(buf, entryLen);
break;
case TransferLogManager::BLOCK_WRITE:
status = processBlockWriteEntry(buf, entryLen);
break;
case TransferLogManager::FILE_RESIZE:
status = processFileResizeEntry(buf, entryLen);
break;
case TransferLogManager::FILE_INVALIDATION:
status = processFileInvalidationEntry(buf, entryLen);
break;
case TransferLogManager::DIRECTORY_INVALIDATION:
status = processDirectoryInvalidationEntry(buf, entryLen);
break;
default:
WLOG(ERROR) << "Invalid entry type found " << type;
return INVALID_LOG;
}
if (status == INVALID_LOG) {
WLOG(ERROR) << "Invalid transfer log";
return status;
}
if (status == INCONSISTENT_DIRECTORY) {
clearParsedData();
}
}
if (status == OK) {
for (auto &pair : fileInfoMap_) {
FileChunksInfo &fileInfo = pair.second;
fileInfo.mergeChunks();
fileChunksInfo.emplace_back(std::move(fileInfo));
}
if (!invalidSeqIds_.empty()) {
if (!writeFileInvalidationEntries(fd, invalidSeqIds_)) {
return INVALID_LOG;
}
}
}
return status;
}