ErrorCode LogParser::parseLog()

in util/TransferLogManager.cpp [1015:1119]


ErrorCode LogParser::parseLog(int fd, string &senderIp,
                              std::vector<FileChunksInfo> &fileChunksInfo) {
  char entry[TransferLogManager::kMaxEntryLength];
  // empty log is valid
  ErrorCode status = OK;
  while (true) {
    int16_t entrySize;
    int64_t toRead = sizeof(int16_t);
    int64_t numRead = ::read(fd, &entrySize, toRead);
    if (numRead < 0) {
      WPLOG(ERROR) << "Error while reading transfer log " << numRead << " "
                   << toRead;
      return INVALID_LOG;
    }
    if (numRead == 0) {
      WVLOG(1) << "got EOF, toRead " << toRead;
      break;
    }
    if (numRead != toRead) {
      // extra bytes at the end, most likely part of the previous write
      // succeeded partially
      if (parseOnly_) {
        WLOG(INFO) << "Extra " << numRead << " bytes at the end of the log";
      } else if (!truncateExtraBytesAtEnd(fd, numRead)) {
        return INVALID_LOG;
      }
      break;
    }
    if (entrySize <= 0 || entrySize > TransferLogManager::kMaxEntryLength) {
      WLOG(ERROR) << "Transfer log parse error, invalid entry length "
                  << entrySize;
      return INVALID_LOG;
    }
    numRead = ::read(fd, entry, entrySize);
    if (numRead < 0) {
      WPLOG(ERROR) << "Error while reading transfer log " << numRead << " "
                   << entrySize;
      return INVALID_LOG;
    }
    if (numRead != entrySize) {
      // extra bytes also includes the size entry
      int64_t extraBytes = numRead + sizeof(int16_t);
      if (parseOnly_) {
        WLOG(INFO) << "Extra " << extraBytes << " bytes at the end of the log";
      } else if (!truncateExtraBytesAtEnd(fd, extraBytes)) {
        return INVALID_LOG;
      }
      break;
    }
    TransferLogManager::EntryType type =
        (TransferLogManager::EntryType)entry[0];
    if (status == INCONSISTENT_DIRECTORY &&
        type != TransferLogManager::HEADER) {
      // If the directory is invalid, no need to process any entry other than
      // header, because only a header can validate a directory
      continue;
    }
    char *buf = entry + 1;
    const int64_t bufSize = sizeof(entry) - 1;
    const int64_t entryLen = entrySize - 1;
    switch (type) {
      case TransferLogManager::HEADER:
        status = processHeaderEntry(buf, bufSize, entryLen, senderIp);
        break;
      case TransferLogManager::FILE_CREATION:
        status = processFileCreationEntry(buf, entryLen);
        break;
      case TransferLogManager::BLOCK_WRITE:
        status = processBlockWriteEntry(buf, entryLen);
        break;
      case TransferLogManager::FILE_RESIZE:
        status = processFileResizeEntry(buf, entryLen);
        break;
      case TransferLogManager::FILE_INVALIDATION:
        status = processFileInvalidationEntry(buf, entryLen);
        break;
      case TransferLogManager::DIRECTORY_INVALIDATION:
        status = processDirectoryInvalidationEntry(buf, entryLen);
        break;
      default:
        WLOG(ERROR) << "Invalid entry type found " << type;
        return INVALID_LOG;
    }
    if (status == INVALID_LOG) {
      WLOG(ERROR) << "Invalid transfer log";
      return status;
    }
    if (status == INCONSISTENT_DIRECTORY) {
      clearParsedData();
    }
  }
  if (status == OK) {
    for (auto &pair : fileInfoMap_) {
      FileChunksInfo &fileInfo = pair.second;
      fileInfo.mergeChunks();
      fileChunksInfo.emplace_back(std::move(fileInfo));
    }
    if (!invalidSeqIds_.empty()) {
      if (!writeFileInvalidationEntries(fd, invalidSeqIds_)) {
        return INVALID_LOG;
      }
    }
  }
  return status;
}