Status VersionSet::ProcessManifestWrites()

in db/version_set.cc [4189:4733]


Status VersionSet::ProcessManifestWrites(
    std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
    FSDirectory* db_directory, bool new_descriptor_log,
    const ColumnFamilyOptions* new_cf_options) {
  mu->AssertHeld();
  assert(!writers.empty());
  ManifestWriter& first_writer = writers.front();
  ManifestWriter* last_writer = &first_writer;

  assert(!manifest_writers_.empty());
  assert(manifest_writers_.front() == &first_writer);

  autovector<VersionEdit*> batch_edits;
  autovector<Version*> versions;
  autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
  std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;

  // Tracking `max_last_sequence` is needed to ensure we write
  // `VersionEdit::last_sequence_`s in non-decreasing order according to the
  // recovery code's requirement. It also allows us to defer updating
  // `descriptor_last_sequence_` until the apply phase, after the log phase
  // succeeds.
  SequenceNumber max_last_sequence = descriptor_last_sequence_;

  if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
    // No group commits for column family add or drop
    LogAndApplyCFHelper(first_writer.edit_list.front(), &max_last_sequence);
    batch_edits.push_back(first_writer.edit_list.front());
  } else {
    auto it = manifest_writers_.cbegin();
    size_t group_start = std::numeric_limits<size_t>::max();
    while (it != manifest_writers_.cend()) {
      if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
        // no group commits for column family add or drop
        break;
      }
      last_writer = *(it++);
      assert(last_writer != nullptr);
      assert(last_writer->cfd != nullptr);
      if (last_writer->cfd->IsDropped()) {
        // If we detect a dropped CF at this point, and the corresponding
        // version edits belong to an atomic group, then we need to find out
        // the preceding version edits in the same atomic group, and update
        // their `remaining_entries_` member variable because we are NOT going
        // to write the version edits' of dropped CF to the MANIFEST. If we
        // don't update, then Recover can report corrupted atomic group because
        // the `remaining_entries_` do not match.
        if (!batch_edits.empty()) {
          if (batch_edits.back()->is_in_atomic_group_ &&
              batch_edits.back()->remaining_entries_ > 0) {
            assert(group_start < batch_edits.size());
            const auto& edit_list = last_writer->edit_list;
            size_t k = 0;
            while (k < edit_list.size()) {
              if (!edit_list[k]->is_in_atomic_group_) {
                break;
              } else if (edit_list[k]->remaining_entries_ == 0) {
                ++k;
                break;
              }
              ++k;
            }
            for (auto i = group_start; i < batch_edits.size(); ++i) {
              assert(static_cast<uint32_t>(k) <=
                     batch_edits.back()->remaining_entries_);
              batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
            }
          }
        }
        continue;
      }
      // We do a linear search on versions because versions is small.
      // TODO(yanqin) maybe consider unordered_map
      Version* version = nullptr;
      VersionBuilder* builder = nullptr;
      for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
        uint32_t cf_id = last_writer->cfd->GetID();
        if (versions[i]->cfd()->GetID() == cf_id) {
          version = versions[i];
          assert(!builder_guards.empty() &&
                 builder_guards.size() == versions.size());
          builder = builder_guards[i]->version_builder();
          TEST_SYNC_POINT_CALLBACK(
              "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
          break;
        }
      }
      if (version == nullptr) {
        // WAL manipulations do not need to be applied to versions.
        if (!last_writer->IsAllWalEdits()) {
          version = new Version(last_writer->cfd, this, file_options_,
                                last_writer->mutable_cf_options, io_tracer_,
                                current_version_number_++);
          versions.push_back(version);
          mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
          builder_guards.emplace_back(
              new BaseReferencedVersionBuilder(last_writer->cfd));
          builder = builder_guards.back()->version_builder();
        }
        assert(last_writer->IsAllWalEdits() || builder);
        assert(last_writer->IsAllWalEdits() || version);
        TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion",
                                 version);
      }
      for (const auto& e : last_writer->edit_list) {
        if (e->is_in_atomic_group_) {
          if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
              (batch_edits.back()->is_in_atomic_group_ &&
               batch_edits.back()->remaining_entries_ == 0)) {
            group_start = batch_edits.size();
          }
        } else if (group_start != std::numeric_limits<size_t>::max()) {
          group_start = std::numeric_limits<size_t>::max();
        }
        Status s = LogAndApplyHelper(last_writer->cfd, builder, e,
                                     &max_last_sequence, mu);
        if (!s.ok()) {
          // free up the allocated memory
          for (auto v : versions) {
            delete v;
          }
          return s;
        }
        batch_edits.push_back(e);
      }
    }
    for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
      assert(!builder_guards.empty() &&
             builder_guards.size() == versions.size());
      auto* builder = builder_guards[i]->version_builder();
      Status s = builder->SaveTo(versions[i]->storage_info());
      if (!s.ok()) {
        // free up the allocated memory
        for (auto v : versions) {
          delete v;
        }
        return s;
      }
    }
  }

#ifndef NDEBUG
  // Verify that version edits of atomic groups have correct
  // remaining_entries_.
  size_t k = 0;
  while (k < batch_edits.size()) {
    while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
      ++k;
    }
    if (k == batch_edits.size()) {
      break;
    }
    size_t i = k;
    while (i < batch_edits.size()) {
      if (!batch_edits[i]->is_in_atomic_group_) {
        break;
      }
      assert(i - k + batch_edits[i]->remaining_entries_ ==
             batch_edits[k]->remaining_entries_);
      if (batch_edits[i]->remaining_entries_ == 0) {
        ++i;
        break;
      }
      ++i;
    }
    assert(batch_edits[i - 1]->is_in_atomic_group_);
    assert(0 == batch_edits[i - 1]->remaining_entries_);
    std::vector<VersionEdit*> tmp;
    for (size_t j = k; j != i; ++j) {
      tmp.emplace_back(batch_edits[j]);
    }
    TEST_SYNC_POINT_CALLBACK(
        "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
    k = i;
  }
#endif  // NDEBUG

  assert(pending_manifest_file_number_ == 0);
  if (!descriptor_log_ ||
      manifest_file_size_ > db_options_->max_manifest_file_size) {
    TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
    new_descriptor_log = true;
  } else {
    pending_manifest_file_number_ = manifest_file_number_;
  }

  // Local cached copy of state variable(s). WriteCurrentStateToManifest()
  // reads its content after releasing db mutex to avoid race with
  // SwitchMemtable().
  std::unordered_map<uint32_t, MutableCFState> curr_state;
  VersionEdit wal_additions;
  if (new_descriptor_log) {
    pending_manifest_file_number_ = NewFileNumber();
    batch_edits.back()->SetNextFile(next_file_number_.load());

    // if we are writing out new snapshot make sure to persist max column
    // family.
    if (column_family_set_->GetMaxColumnFamily() > 0) {
      first_writer.edit_list.front()->SetMaxColumnFamily(
          column_family_set_->GetMaxColumnFamily());
    }
    for (const auto* cfd : *column_family_set_) {
      assert(curr_state.find(cfd->GetID()) == curr_state.end());
      curr_state.emplace(std::make_pair(
          cfd->GetID(),
          MutableCFState(cfd->GetLogNumber(), cfd->GetFullHistoryTsLow())));
    }

    for (const auto& wal : wals_.GetWals()) {
      wal_additions.AddWal(wal.first, wal.second);
    }
  }

  uint64_t new_manifest_file_size = 0;
  Status s;
  IOStatus io_s;
  IOStatus manifest_io_status;
  {
    FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
    mu->Unlock();
    TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestStart");
    TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
    if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
        assert(!builder_guards.empty() &&
               builder_guards.size() == versions.size());
        assert(!mutable_cf_options_ptrs.empty() &&
               builder_guards.size() == versions.size());
        ColumnFamilyData* cfd = versions[i]->cfd_;
        s = builder_guards[i]->version_builder()->LoadTableHandlers(
            cfd->internal_stats(), 1 /* max_threads */,
            true /* prefetch_index_and_filter_in_cache */,
            false /* is_initial_load */,
            mutable_cf_options_ptrs[i]->prefix_extractor,
            MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i]));
        if (!s.ok()) {
          if (db_options_->paranoid_checks) {
            break;
          }
          s = Status::OK();
        }
      }
    }

    if (s.ok() && new_descriptor_log) {
      // This is fine because everything inside of this block is serialized --
      // only one thread can be here at the same time
      // create new manifest file
      ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
                     pending_manifest_file_number_);
      std::string descriptor_fname =
          DescriptorFileName(dbname_, pending_manifest_file_number_);
      std::unique_ptr<FSWritableFile> descriptor_file;
      io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file,
                             opt_file_opts);
      if (io_s.ok()) {
        descriptor_file->SetPreallocationBlockSize(
            db_options_->manifest_preallocation_size);
        FileTypeSet tmp_set = db_options_->checksum_handoff_file_types;
        std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
            std::move(descriptor_file), descriptor_fname, opt_file_opts, clock_,
            io_tracer_, nullptr, db_options_->listeners, nullptr,
            tmp_set.Contains(FileType::kDescriptorFile),
            tmp_set.Contains(FileType::kDescriptorFile)));
        descriptor_log_.reset(
            new log::Writer(std::move(file_writer), 0, false));
        s = WriteCurrentStateToManifest(curr_state, wal_additions,
                                        descriptor_log_.get(), io_s);
      } else {
        manifest_io_status = io_s;
        s = io_s;
      }
    }

    if (s.ok()) {
      if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
        constexpr bool update_stats = true;

        for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
          versions[i]->PrepareAppend(*mutable_cf_options_ptrs[i], update_stats);
        }
      }

      // Write new records to MANIFEST log
#ifndef NDEBUG
      size_t idx = 0;
#endif
      for (auto& e : batch_edits) {
        std::string record;
        if (!e->EncodeTo(&record)) {
          s = Status::Corruption("Unable to encode VersionEdit:" +
                                 e->DebugString(true));
          break;
        }
        TEST_KILL_RANDOM_WITH_WEIGHT("VersionSet::LogAndApply:BeforeAddRecord",
                                     REDUCE_ODDS2);
#ifndef NDEBUG
        if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
          TEST_SYNC_POINT_CALLBACK(
              "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
              nullptr);
          TEST_SYNC_POINT(
              "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
        }
        ++idx;
#endif /* !NDEBUG */
        io_s = descriptor_log_->AddRecord(record);
        if (!io_s.ok()) {
          s = io_s;
          manifest_io_status = io_s;
          break;
        }
      }
      if (s.ok()) {
        io_s = SyncManifest(db_options_, descriptor_log_->file());
        manifest_io_status = io_s;
        TEST_SYNC_POINT_CALLBACK(
            "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
      }
      if (!io_s.ok()) {
        s = io_s;
        ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
                        s.ToString().c_str());
      }
    }

    // If we just created a new descriptor file, install it by writing a
    // new CURRENT file that points to it.
    if (s.ok()) {
      assert(manifest_io_status.ok());
    }
    if (s.ok() && new_descriptor_log) {
      io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
                            db_directory);
      if (!io_s.ok()) {
        s = io_s;
      }
    }

    if (s.ok()) {
      // find offset in manifest file where this version is stored.
      new_manifest_file_size = descriptor_log_->file()->GetFileSize();
    }

    if (first_writer.edit_list.front()->is_column_family_drop_) {
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
    }

    LogFlush(db_options_->info_log);
    TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
    mu->Lock();
  }

  if (s.ok()) {
    // Apply WAL edits, DB mutex must be held.
    for (auto& e : batch_edits) {
      if (e->IsWalAddition()) {
        s = wals_.AddWals(e->GetWalAdditions());
      } else if (e->IsWalDeletion()) {
        s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber());
      }
      if (!s.ok()) {
        break;
      }
    }
  }

  if (!io_s.ok()) {
    if (io_status_.ok()) {
      io_status_ = io_s;
    }
  } else if (!io_status_.ok()) {
    io_status_ = io_s;
  }

  // Append the old manifest file to the obsolete_manifest_ list to be deleted
  // by PurgeObsoleteFiles later.
  if (s.ok() && new_descriptor_log) {
    obsolete_manifests_.emplace_back(
        DescriptorFileName("", manifest_file_number_));
  }

  // Install the new versions
  if (s.ok()) {
    if (first_writer.edit_list.front()->is_column_family_add_) {
      assert(batch_edits.size() == 1);
      assert(new_cf_options != nullptr);
      assert(max_last_sequence == descriptor_last_sequence_);
      CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
    } else if (first_writer.edit_list.front()->is_column_family_drop_) {
      assert(batch_edits.size() == 1);
      assert(max_last_sequence == descriptor_last_sequence_);
      first_writer.cfd->SetDropped();
      first_writer.cfd->UnrefAndTryDelete();
    } else {
      // Each version in versions corresponds to a column family.
      // For each column family, update its log number indicating that logs
      // with number smaller than this should be ignored.
      uint64_t last_min_log_number_to_keep = 0;
      for (const auto& e : batch_edits) {
        ColumnFamilyData* cfd = nullptr;
        if (!e->IsColumnFamilyManipulation()) {
          cfd = column_family_set_->GetColumnFamily(e->column_family_);
          // e would not have been added to batch_edits if its corresponding
          // column family is dropped.
          assert(cfd);
        }
        if (cfd) {
          if (e->has_log_number_ && e->log_number_ > cfd->GetLogNumber()) {
            cfd->SetLogNumber(e->log_number_);
          }
          if (e->HasFullHistoryTsLow()) {
            cfd->SetFullHistoryTsLow(e->GetFullHistoryTsLow());
          }
        }
        if (e->has_min_log_number_to_keep_) {
          last_min_log_number_to_keep =
              std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
        }
      }

      if (last_min_log_number_to_keep != 0) {
        MarkMinLogNumberToKeep(last_min_log_number_to_keep);
      }

      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
        ColumnFamilyData* cfd = versions[i]->cfd_;
        AppendVersion(cfd, versions[i]);
      }
    }
    assert(max_last_sequence >= descriptor_last_sequence_);
    descriptor_last_sequence_ = max_last_sequence;
    manifest_file_number_ = pending_manifest_file_number_;
    manifest_file_size_ = new_manifest_file_size;
    prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
  } else {
    std::string version_edits;
    for (auto& e : batch_edits) {
      version_edits += ("\n" + e->DebugString(true));
    }
    ROCKS_LOG_ERROR(db_options_->info_log,
                    "Error in committing version edit to MANIFEST: %s",
                    version_edits.c_str());
    for (auto v : versions) {
      delete v;
    }
    if (manifest_io_status.ok()) {
      manifest_file_number_ = pending_manifest_file_number_;
      manifest_file_size_ = new_manifest_file_size;
    }
    // If manifest append failed for whatever reason, the file could be
    // corrupted. So we need to force the next version update to start a
    // new manifest file.
    descriptor_log_.reset();
    // If manifest operations failed, then we know the CURRENT file still
    // points to the original MANIFEST. Therefore, we can safely delete the
    // new MANIFEST.
    // If manifest operations succeeded, and we are here, then it is possible
    // that renaming tmp file to CURRENT failed.
    //
    // On local POSIX-compliant FS, the CURRENT must point to the original
    // MANIFEST. We can delete the new MANIFEST for simplicity, but we can also
    // keep it. Future recovery will ignore this MANIFEST. It's also ok for the
    // process not to crash and continue using the db. Any future LogAndApply()
    // call will switch to a new MANIFEST and update CURRENT, still ignoring
    // this one.
    //
    // On non-local FS, it is
    // possible that the rename operation succeeded on the server (remote)
    // side, but the client somehow returns a non-ok status to RocksDB. Note
    // that this does not violate atomicity. Should we delete the new MANIFEST
    // successfully, a subsequent recovery attempt will likely see the CURRENT
    // pointing to the new MANIFEST, thus fail. We will not be able to open the
    // DB again. Therefore, if manifest operations succeed, we should keep the
    // the new MANIFEST. If the process proceeds, any future LogAndApply() call
    // will switch to a new MANIFEST and update CURRENT. If user tries to
    // re-open the DB,
    // a) CURRENT points to the new MANIFEST, and the new MANIFEST is present.
    // b) CURRENT points to the original MANIFEST, and the original MANIFEST
    //    also exists.
    if (new_descriptor_log && !manifest_io_status.ok()) {
      ROCKS_LOG_INFO(db_options_->info_log,
                     "Deleting manifest %" PRIu64 " current manifest %" PRIu64
                     "\n",
                     pending_manifest_file_number_, manifest_file_number_);
      Status manifest_del_status = env_->DeleteFile(
          DescriptorFileName(dbname_, pending_manifest_file_number_));
      if (!manifest_del_status.ok()) {
        ROCKS_LOG_WARN(db_options_->info_log,
                       "Failed to delete manifest %" PRIu64 ": %s",
                       pending_manifest_file_number_,
                       manifest_del_status.ToString().c_str());
      }
    }
  }

  pending_manifest_file_number_ = 0;

#ifndef NDEBUG
  // This is here kind of awkwardly because there's no other consistency
  // checks on `VersionSet`'s updates for the new `Version`s. We might want
  // to move it to a dedicated function, or remove it if we gain enough
  // confidence in `descriptor_last_sequence_`.
  if (s.ok()) {
    for (const auto* v : versions) {
      const auto* vstorage = v->storage_info();
      for (int level = 0; level < vstorage->num_levels(); ++level) {
        for (const auto& file : vstorage->LevelFiles(level)) {
          assert(file->fd.largest_seqno <= descriptor_last_sequence_);
        }
      }
    }
  }
#endif  // NDEBUG

  // wake up all the waiting writers
  while (true) {
    ManifestWriter* ready = manifest_writers_.front();
    manifest_writers_.pop_front();
    bool need_signal = true;
    for (const auto& w : writers) {
      if (&w == ready) {
        need_signal = false;
        break;
      }
    }
    ready->status = s;
    ready->done = true;
    if (ready->manifest_write_callback) {
      (ready->manifest_write_callback)(s);
    }
    if (need_signal) {
      ready->cv.Signal();
    }
    if (ready == last_writer) {
      break;
    }
  }
  if (!manifest_writers_.empty()) {
    manifest_writers_.front()->cv.Signal();
  }
  return s;
}