in db/log_reader.cc [62:264]
bool Reader::ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode wal_recovery_mode) {
scratch->clear();
record->clear();
if (uncompress_) {
uncompress_->Reset();
}
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
size_t drop_size = 0;
const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
switch (record_type) {
case kFullType:
case kRecyclableFullType:
if (in_fragmented_record && !scratch->empty()) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
ReportCorruption(scratch->size(), "partial record without end(1)");
}
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
first_record_read_ = true;
return true;
case kFirstType:
case kRecyclableFirstType:
if (in_fragmented_record && !scratch->empty()) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
ReportCorruption(scratch->size(), "partial record without end(2)");
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType:
case kRecyclableMiddleType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;
case kLastType:
case kRecyclableLastType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
first_record_read_ = true;
return true;
}
break;
case kBadHeader:
if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
// In clean shutdown we don't expect any error in the log files.
// In point-in-time recovery an incomplete record at the end could
// produce a hole in the recovered data. Report an error here, which
// higher layers can choose to ignore when it's provable there is no
// hole.
ReportCorruption(drop_size, "truncated header");
}
FALLTHROUGH_INTENDED;
case kEof:
if (in_fragmented_record) {
if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
// In clean shutdown we don't expect any error in the log files.
// In point-in-time recovery an incomplete record at the end could
// produce a hole in the recovered data. Report an error here, which
// higher layers can choose to ignore when it's provable there is no
// hole.
ReportCorruption(scratch->size(), "error reading trailing data");
}
// This can be caused by the writer dying immediately after
// writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record.
scratch->clear();
}
return false;
case kOldRecord:
if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
// Treat a record from a previous instance of the log as EOF.
if (in_fragmented_record) {
if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
// In clean shutdown we don't expect any error in the log files.
// In point-in-time recovery an incomplete record at the end could
// produce a hole in the recovered data. Report an error here,
// which higher layers can choose to ignore when it's provable
// there is no hole.
ReportCorruption(scratch->size(), "error reading trailing data");
}
// This can be caused by the writer dying immediately after
// writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record.
scratch->clear();
}
return false;
}
FALLTHROUGH_INTENDED;
case kBadRecord:
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
case kBadRecordLen:
if (eof_) {
if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
// In clean shutdown we don't expect any error in the log files.
// In point-in-time recovery an incomplete record at the end could
// produce a hole in the recovered data. Report an error here, which
// higher layers can choose to ignore when it's provable there is no
// hole.
ReportCorruption(drop_size, "truncated record body");
}
return false;
}
FALLTHROUGH_INTENDED;
case kBadRecordChecksum:
if (recycled_ &&
wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords) {
scratch->clear();
return false;
}
if (record_type == kBadRecordLen) {
ReportCorruption(drop_size, "bad record length");
} else {
ReportCorruption(drop_size, "checksum mismatch");
}
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
case kSetCompressionType: {
if (compression_type_record_read_) {
ReportCorruption(fragment.size(),
"read multiple SetCompressionType records");
}
if (first_record_read_) {
ReportCorruption(fragment.size(),
"SetCompressionType not the first record");
}
prospective_record_offset = physical_record_offset;
scratch->clear();
last_record_offset_ = prospective_record_offset;
CompressionTypeRecord compression_record(kNoCompression);
Status s = compression_record.DecodeFrom(&fragment);
if (!s.ok()) {
ReportCorruption(fragment.size(),
"could not decode SetCompressionType record");
} else {
InitCompression(compression_record);
}
break;
}
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
buf);
in_fragmented_record = false;
scratch->clear();
break;
}
}
}
return false;
}