in db/db_impl/db_impl_open.cc [809:1315]
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_wal_found) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
void Corruption(size_t bytes, const Status& s) override {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(status == nullptr ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
if (status != nullptr && status->ok()) {
*status = s;
}
}
};
mutex_.AssertHeld();
Status status;
std::unordered_map<int, VersionEdit> version_edits;
// no need to refcount because iteration is under mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
version_edits.insert({cfd->GetID(), edit});
}
int job_id = next_job_id_.fetch_add(1);
{
auto stream = event_logger_.Log();
stream << "job" << job_id << "event"
<< "recovery_started";
stream << "wal_files";
stream.StartArray();
for (auto wal_number : wal_numbers) {
stream << wal_number;
}
stream.EndArray();
}
#ifndef ROCKSDB_LITE
if (immutable_db_options_.wal_filter != nullptr) {
std::map<std::string, uint32_t> cf_name_id_map;
std::map<uint32_t, uint64_t> cf_lognumber_map;
for (auto cfd : *versions_->GetColumnFamilySet()) {
cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
cf_lognumber_map.insert(
std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
}
immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
cf_name_id_map);
}
#endif
bool stop_replay_by_wal_filter = false;
bool stop_replay_for_corruption = false;
bool flushed = false;
uint64_t corrupted_wal_number = kMaxSequenceNumber;
uint64_t min_wal_number = MinLogNumberToKeep();
if (!allow_2pc()) {
// In non-2pc mode, we skip WALs that do not back unflushed data.
min_wal_number =
std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
}
for (auto wal_number : wal_numbers) {
if (wal_number < min_wal_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Skipping log #%" PRIu64
" since it is older than min log to keep #%" PRIu64,
wal_number, min_wal_number);
continue;
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(wal_number);
// Open the log file
std::string fname =
LogFileName(immutable_db_options_.GetWalDir(), wal_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", wal_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode));
auto logFileDropped = [this, &fname]() {
uint64_t bytes;
if (env_->GetFileSize(fname, &bytes).ok()) {
auto info_log = immutable_db_options_.info_log.get();
ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
static_cast<int>(bytes));
}
};
if (stop_replay_by_wal_filter) {
logFileDropped();
continue;
}
std::unique_ptr<SequentialFileReader> file_reader;
{
std::unique_ptr<FSSequentialFile> file;
status = fs_->NewSequentialFile(fname,
fs_->OptimizeForLogRead(file_options_),
&file, nullptr);
if (!status.ok()) {
MaybeIgnoreError(&status);
if (!status.ok()) {
return status;
} else {
// Fail with one log file, but that's ok.
// Try next one.
continue;
}
}
file_reader.reset(new SequentialFileReader(
std::move(file), fname, immutable_db_options_.log_readahead_size,
io_tracer_));
}
// Create the log reader.
LogReporter reporter;
reporter.env = env_;
reporter.info_log = immutable_db_options_.info_log.get();
reporter.fname = fname.c_str();
if (!immutable_db_options_.paranoid_checks ||
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kSkipAnyCorruptedRecords) {
reporter.status = nullptr;
} else {
reporter.status = &status;
}
// We intentially make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
&reporter, true /*checksum*/, wal_number);
// Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
/*arg=*/nullptr);
while (!stop_replay_by_wal_filter &&
reader.ReadRecord(&record, &scratch,
immutable_db_options_.wal_recovery_mode) &&
status.ok()) {
if (record.size() < WriteBatchInternal::kHeader) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
}
status = WriteBatchInternal::SetContents(&batch, record);
if (!status.ok()) {
return status;
}
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
if (immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
// In point-in-time recovery mode, if sequence id of log files are
// consecutive, we continue recovery despite corruption. This could
// happen when we open and write to a corrupted DB, where sequence id
// will start from the last sequence id we recovered.
if (sequence == *next_sequence) {
stop_replay_for_corruption = false;
}
if (stop_replay_for_corruption) {
logFileDropped();
break;
}
}
#ifndef ROCKSDB_LITE
if (immutable_db_options_.wal_filter != nullptr) {
WriteBatch new_batch;
bool batch_changed = false;
WalFilter::WalProcessingOption wal_processing_option =
immutable_db_options_.wal_filter->LogRecordFound(
wal_number, fname, batch, &new_batch, &batch_changed);
switch (wal_processing_option) {
case WalFilter::WalProcessingOption::kContinueProcessing:
// do nothing, proceeed normally
break;
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
// skip current record
continue;
case WalFilter::WalProcessingOption::kStopReplay:
// skip current record and stop replay
stop_replay_by_wal_filter = true;
continue;
case WalFilter::WalProcessingOption::kCorruptedRecord: {
status =
Status::Corruption("Corruption reported by Wal Filter ",
immutable_db_options_.wal_filter->Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
reporter.Corruption(record.size(), status);
continue;
}
break;
}
default: {
assert(false); // unhandled case
status = Status::NotSupported(
"Unknown WalProcessingOption returned"
" by Wal Filter ",
immutable_db_options_.wal_filter->Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
return status;
} else {
// Ignore the error with current record processing.
continue;
}
}
}
if (batch_changed) {
// Make sure that the count in the new batch is
// within the orignal count.
int new_count = WriteBatchInternal::Count(&new_batch);
int original_count = WriteBatchInternal::Count(&batch);
if (new_count > original_count) {
ROCKS_LOG_FATAL(
immutable_db_options_.info_log,
"Recovering log #%" PRIu64
" mode %d log filter %s returned "
"more records (%d) than original (%d) which is not allowed. "
"Aborting recovery.",
wal_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode),
immutable_db_options_.wal_filter->Name(), new_count,
original_count);
status = Status::NotSupported(
"More than original # of records "
"returned by Wal Filter ",
immutable_db_options_.wal_filter->Name());
return status;
}
// Set the same sequence number in the new_batch
// as the original batch.
WriteBatchInternal::SetSequence(&new_batch,
WriteBatchInternal::Sequence(&batch));
batch = new_batch;
}
}
#endif // ROCKSDB_LITE
// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
// insert. We don't want to fail the whole write batch in that case --
// we just ignore the update.
// That's why we set ignore missing column families to true
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_,
&trim_history_scheduler_, true, wal_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
MaybeIgnoreError(&status);
if (!status.ok()) {
// We are treating this as a failure while reading since we read valid
// blocks that do not form coherent data
reporter.Corruption(record.size(), status);
continue;
}
if (has_valid_writes && !read_only) {
// we can do this because this is called before client has access to the
// DB and there is only a single thread operating on DB
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
cfd->UnrefAndTryDelete();
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= wal_number);
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
return status;
}
flushed = true;
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
*next_sequence);
}
}
}
if (!status.ok()) {
if (status.IsNotSupported()) {
// We should not treat NotSupported as corruption. It is rather a clear
// sign that we are processing a WAL that is produced by an incompatible
// version of the code.
return status;
}
if (immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kSkipAnyCorruptedRecords) {
// We should ignore all errors unconditionally
status = Status::OK();
} else if (immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
if (status.IsIOError()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"IOError during point-in-time reading log #%" PRIu64
" seq #%" PRIu64
". %s. This likely mean loss of synced WAL, "
"thus recovery fails.",
wal_number, *next_sequence,
status.ToString().c_str());
return status;
}
// We should ignore the error but not continue replaying
status = Status::OK();
stop_replay_for_corruption = true;
corrupted_wal_number = wal_number;
if (corrupted_wal_found != nullptr) {
*corrupted_wal_found = true;
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Point in time recovered to log #%" PRIu64
" seq #%" PRIu64,
wal_number, *next_sequence);
} else {
assert(immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords ||
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kAbsoluteConsistency);
return status;
}
}
flush_scheduler_.Clear();
trim_history_scheduler_.Clear();
auto last_sequence = *next_sequence - 1;
if ((*next_sequence != kMaxSequenceNumber) &&
(versions_->LastSequence() <= last_sequence)) {
versions_->SetLastAllocatedSequence(last_sequence);
versions_->SetLastPublishedSequence(last_sequence);
versions_->SetLastSequence(last_sequence);
}
}
// Compare the corrupted log number to all columnfamily's current log number.
// Abort Open() if any column family's log number is greater than
// the corrupted log number, which means CF contains data beyond the point of
// corruption. This could during PIT recovery when the WAL is corrupted and
// some (but not all) CFs are flushed
// Exclude the PIT case where no log is dropped after the corruption point.
// This is to cover the case for empty wals after corrupted log, in which we
// don't reset stop_replay_for_corruption.
if (stop_replay_for_corruption == true &&
(immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery ||
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords)) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
// One special case cause cfd->GetLogNumber() > corrupted_wal_number but
// the CF is still consistent: If a new column family is created during
// the flush and the WAL sync fails at the same time, the new CF points to
// the new WAL but the old WAL is curropted. Since the new CF is empty, it
// is still consistent. We add the check of CF sst file size to avoid the
// false positive alert.
// Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
// the ignorance of a very rare inconsistency case caused in data
// canclation. One CF is empty due to KV deletion. But those operations
// are in the WAL. If the WAL is corrupted, the status of this CF might
// not be consistent with others. However, the consistency check will be
// bypassed due to empty CF.
// TODO: a better and complete implementation is needed to ensure strict
// consistency check in WAL recovery including hanlding the tailing
// issues.
if (cfd->GetLogNumber() > corrupted_wal_number &&
cfd->GetLiveSstFilesSize() > 0) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Column family inconsistency: SST file contains data"
" beyond the point of corruption.");
return Status::Corruption("SST file is ahead of WALs in CF " +
cfd->GetName());
}
}
}
// True if there's any data in the WALs; if not, we can skip re-processing
// them later
bool data_seen = false;
if (!read_only) {
// no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate
const WalNumber max_wal_number = wal_numbers.back();
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
if (cfd->GetLogNumber() > max_wal_number) {
// Column family cfd has already flushed the data
// from all wals. Memtable has to be empty because
// we filter the updates based on wal_number
// (in WriteBatch::InsertInto)
assert(cfd->mem()->GetFirstSequenceNumber() == 0);
assert(edit->NumEntries() == 0);
continue;
}
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
// flush the final memtable (if non-empty)
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
// If flush happened in the middle of recovery (e.g. due to memtable
// being full), we flush at the end. Otherwise we'll need to record
// where we were on last flush, which make the logic complicated.
if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
if (!status.ok()) {
// Recovery failed
break;
}
flushed = true;
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
versions_->LastSequence());
}
data_seen = true;
}
// Update the log number info in the version edit corresponding to this
// column family. Note that the version edits will be written to MANIFEST
// together later.
// writing wal_number in the manifest means that any log file
// with number strongly less than (wal_number + 1) is already
// recovered and should be ignored on next reincarnation.
// Since we already recovered max_wal_number, we want all wals
// with numbers `<= max_wal_number` (includes this one) to be ignored
if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
edit->SetLogNumber(max_wal_number + 1);
}
}
if (status.ok()) {
// we must mark the next log number as used, even though it's
// not actually used. that is because VersionSet assumes
// VersionSet::next_file_number_ always to be strictly greater than any
// log number
versions_->MarkFileNumberUsed(max_wal_number + 1);
autovector<ColumnFamilyData*> cfds;
autovector<const MutableCFOptions*> cf_opts;
autovector<autovector<VersionEdit*>> edit_lists;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
cfds.push_back(cfd);
cf_opts.push_back(cfd->GetLatestMutableCFOptions());
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
edit_lists.push_back({&iter->second});
}
std::unique_ptr<VersionEdit> wal_deletion;
if (flushed) {
wal_deletion = std::make_unique<VersionEdit>();
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
wal_deletion->DeleteWalsBefore(max_wal_number + 1);
}
if (!allow_2pc()) {
// In non-2pc mode, flushing the memtables of the column families
// means we can advance min_log_number_to_keep.
wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1);
}
edit_lists.back().push_back(wal_deletion.get());
}
// write MANIFEST with update
status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
directories_.GetDbDir(),
/*new_descriptor_log=*/true);
}
}
if (status.ok()) {
if (data_seen && !flushed) {
status = RestoreAliveLogFiles(wal_numbers);
} else {
// If there's no data in the WAL, or we flushed all the data, still
// truncate the log file. If the process goes into a crash loop before
// the file is deleted, the preallocated space will never get freed.
const bool truncate = !read_only;
GetLogSizeAndMaybeTruncate(wal_numbers.back(), truncate, nullptr)
.PermitUncheckedError();
}
}
event_logger_.Log() << "job" << job_id << "event"
<< "recovery_finished";
return status;
}