IOStatus BackupEngineImpl::BackupMeta::LoadFromFile()

in utilities/backupable/backupable_db.cc [2771:3030]


IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
    const std::string& backup_dir,
    const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
    RateLimiter* rate_limiter, Logger* info_log,
    std::unordered_set<std::string>* reported_ignored_fields) {
  assert(reported_ignored_fields);
  assert(Empty());

  std::unique_ptr<LineFileReader> backup_meta_reader;
  {
    IOStatus io_s = LineFileReader::Create(fs_, meta_filename_, FileOptions(),
                                           &backup_meta_reader, nullptr);
    if (!io_s.ok()) {
      return io_s;
    }
  }

  // If we don't read an explicit schema_version, that implies version 1,
  // which is what we call the original backup meta schema.
  int schema_major_version = 1;

  // Failures handled at the end
  std::string line;
  if (backup_meta_reader->ReadLine(&line)) {
    if (rate_limiter != nullptr) {
      LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
                                 nullptr /* stats */,
                                 RateLimiter::OpType::kRead);
    }
    if (StartsWith(line, kSchemaVersionPrefix)) {
      std::string ver = line.substr(kSchemaVersionPrefix.size());
      if (ver == "2" || StartsWith(ver, "2.")) {
        schema_major_version = 2;
      } else {
        return IOStatus::NotSupported(
            "Unsupported/unrecognized schema version: " + ver);
      }
      line.clear();
    } else if (line.empty()) {
      return IOStatus::Corruption("Unexpected empty line");
    }
  }
  if (!line.empty()) {
    timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
  } else if (backup_meta_reader->ReadLine(&line)) {
    if (rate_limiter != nullptr) {
      LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
                                 nullptr /* stats */,
                                 RateLimiter::OpType::kRead);
    }
    timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
  }
  if (backup_meta_reader->ReadLine(&line)) {
    if (rate_limiter != nullptr) {
      LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
                                 nullptr /* stats */,
                                 RateLimiter::OpType::kRead);
    }
    sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
  }
  uint32_t num_files = UINT32_MAX;
  while (backup_meta_reader->ReadLine(&line)) {
    if (rate_limiter != nullptr) {
      LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
                                 nullptr /* stats */,
                                 RateLimiter::OpType::kRead);
    }
    if (line.empty()) {
      return IOStatus::Corruption("Unexpected empty line");
    }
    // Number -> number of files -> exit loop reading optional meta fields
    if (line[0] >= '0' && line[0] <= '9') {
      num_files = static_cast<uint32_t>(strtoul(line.c_str(), nullptr, 10));
      break;
    }
    // else, must be a meta field assignment
    auto space_pos = line.find_first_of(' ');
    if (space_pos == std::string::npos) {
      return IOStatus::Corruption("Expected number of files or meta field");
    }
    std::string field_name = line.substr(0, space_pos);
    std::string field_data = line.substr(space_pos + 1);
    if (field_name == kAppMetaDataFieldName) {
      // app metadata present
      bool decode_success = Slice(field_data).DecodeHex(&app_metadata_);
      if (!decode_success) {
        return IOStatus::Corruption(
            "Failed to decode stored hex encoded app metadata");
      }
    } else if (schema_major_version < 2) {
      return IOStatus::Corruption("Expected number of files or \"" +
                                  kAppMetaDataFieldName + "\" field");
    } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
      return IOStatus::NotSupported("Unrecognized non-ignorable meta field " +
                                    field_name + " (from future version?)");
    } else {
      // Warn the first time we see any particular unrecognized meta field
      if (reported_ignored_fields->insert("meta:" + field_name).second) {
        ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup meta field %s",
                       field_name.c_str());
      }
    }
  }
  std::vector<std::shared_ptr<FileInfo>> files;
  bool footer_present = false;
  while (backup_meta_reader->ReadLine(&line)) {
    if (rate_limiter != nullptr) {
      LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
                                 nullptr /* stats */,
                                 RateLimiter::OpType::kRead);
    }
    std::vector<std::string> components = StringSplit(line, ' ');

    if (components.size() < 1) {
      return IOStatus::Corruption("Empty line instead of file entry.");
    }
    if (schema_major_version >= 2 && components.size() == 2 &&
        line == kFooterMarker) {
      footer_present = true;
      break;
    }

    const std::string& filename = components[0];

    uint64_t actual_size;
    const std::shared_ptr<FileInfo> file_info = GetFile(filename);
    if (file_info) {
      actual_size = file_info->size;
    } else {
      std::string abs_path = backup_dir + "/" + filename;
      auto e = abs_path_to_size.find(abs_path);
      if (e == abs_path_to_size.end()) {
        return IOStatus::Corruption(
            "Pathname in meta file not found on disk: " + abs_path);
      }
      actual_size = e->second;
    }

    if (schema_major_version >= 2) {
      if (components.size() % 2 != 1) {
        return IOStatus::Corruption(
            "Bad number of line components for file entry.");
      }
    } else {
      // Check restricted original schema
      if (components.size() < 3) {
        return IOStatus::Corruption("File checksum is missing for " + filename +
                                    " in " + meta_filename_);
      }
      if (components[1] != kFileCrc32cFieldName) {
        return IOStatus::Corruption("Unknown checksum type for " + filename +
                                    " in " + meta_filename_);
      }
      if (components.size() > 3) {
        return IOStatus::Corruption("Extra data for entry " + filename +
                                    " in " + meta_filename_);
      }
    }

    std::string checksum_hex;
    Temperature temp = Temperature::kUnknown;
    for (unsigned i = 1; i < components.size(); i += 2) {
      const std::string& field_name = components[i];
      const std::string& field_data = components[i + 1];

      if (field_name == kFileCrc32cFieldName) {
        uint32_t checksum_value =
            static_cast<uint32_t>(strtoul(field_data.c_str(), nullptr, 10));
        if (field_data != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
          return IOStatus::Corruption("Invalid checksum value for " + filename +
                                      " in " + meta_filename_);
        }
        checksum_hex = ChecksumInt32ToHex(checksum_value);
      } else if (field_name == kFileSizeFieldName) {
        uint64_t ex_size =
            std::strtoull(field_data.c_str(), nullptr, /*base*/ 10);
        if (ex_size != actual_size) {
          return IOStatus::Corruption(
              "For file " + filename + " expected size " + ToString(ex_size) +
              " but found size" + ToString(actual_size));
        }
      } else if (field_name == kTemperatureFieldName) {
        auto iter = temperature_string_map.find(field_data);
        if (iter != temperature_string_map.end()) {
          temp = iter->second;
        } else {
          // Could report corruption, but in case of new temperatures added
          // in future, letting those map to kUnknown which should generally
          // be safe.
          temp = Temperature::kUnknown;
        }
      } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
        return IOStatus::NotSupported("Unrecognized non-ignorable file field " +
                                      field_name + " (from future version?)");
      } else {
        // Warn the first time we see any particular unrecognized file field
        if (reported_ignored_fields->insert("file:" + field_name).second) {
          ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup file field %s",
                         field_name.c_str());
        }
      }
    }

    files.emplace_back(new FileInfo(filename, actual_size, checksum_hex,
                                    /*id*/ "", /*sid*/ "", temp));
  }

  if (footer_present) {
    assert(schema_major_version >= 2);
    while (backup_meta_reader->ReadLine(&line)) {
      if (rate_limiter != nullptr) {
        LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
                                   nullptr /* stats */,
                                   RateLimiter::OpType::kRead);
      }
      if (line.empty()) {
        return IOStatus::Corruption("Unexpected empty line");
      }
      auto space_pos = line.find_first_of(' ');
      if (space_pos == std::string::npos) {
        return IOStatus::Corruption("Expected footer field");
      }
      std::string field_name = line.substr(0, space_pos);
      std::string field_data = line.substr(space_pos + 1);
      if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
        return IOStatus::NotSupported("Unrecognized non-ignorable field " +
                                      field_name + " (from future version?)");
      } else if (reported_ignored_fields->insert("footer:" + field_name)
                     .second) {
        // Warn the first time we see any particular unrecognized footer field
        ROCKS_LOG_WARN(info_log,
                       "Ignoring unrecognized backup meta footer field %s",
                       field_name.c_str());
      }
    }
  }

  {
    IOStatus io_s = backup_meta_reader->GetStatus();
    if (!io_s.ok()) {
      return io_s;
    }
  }

  if (num_files != files.size()) {
    return IOStatus::Corruption(
        "Inconsistent number of files or missing/incomplete header in " +
        meta_filename_);
  }

  files_.reserve(files.size());
  for (const auto& file_info : files) {
    IOStatus io_s = AddFile(file_info);
    if (!io_s.ok()) {
      return io_s;
    }
  }

  return IOStatus::OK();
}