in db/version_edit.cc [415:745]
Status VersionEdit::DecodeFrom(const Slice& src) {
Clear();
#ifndef NDEBUG
bool ignore_ignorable_tags = false;
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:IgnoreIgnorableTags",
&ignore_ignorable_tags);
#endif
Slice input = src;
const char* msg = nullptr;
uint32_t tag = 0;
// Temporary storage for parsing
int level = 0;
FileMetaData f;
Slice str;
InternalKey key;
while (msg == nullptr && GetVarint32(&input, &tag)) {
#ifndef NDEBUG
if (ignore_ignorable_tags && tag > kTagSafeIgnoreMask) {
tag = kTagSafeIgnoreMask;
}
#endif
switch (tag) {
case kDbId:
if (GetLengthPrefixedSlice(&input, &str)) {
db_id_ = str.ToString();
has_db_id_ = true;
} else {
msg = "db id";
}
break;
case kComparator:
if (GetLengthPrefixedSlice(&input, &str)) {
comparator_ = str.ToString();
has_comparator_ = true;
} else {
msg = "comparator name";
}
break;
case kLogNumber:
if (GetVarint64(&input, &log_number_)) {
has_log_number_ = true;
} else {
msg = "log number";
}
break;
case kPrevLogNumber:
if (GetVarint64(&input, &prev_log_number_)) {
has_prev_log_number_ = true;
} else {
msg = "previous log number";
}
break;
case kNextFileNumber:
if (GetVarint64(&input, &next_file_number_)) {
has_next_file_number_ = true;
} else {
msg = "next file number";
}
break;
case kMaxColumnFamily:
if (GetVarint32(&input, &max_column_family_)) {
has_max_column_family_ = true;
} else {
msg = "max column family";
}
break;
case kMinLogNumberToKeep:
if (GetVarint64(&input, &min_log_number_to_keep_)) {
has_min_log_number_to_keep_ = true;
} else {
msg = "min log number to kee";
}
break;
case kLastSequence:
if (GetVarint64(&input, &last_sequence_)) {
has_last_sequence_ = true;
} else {
msg = "last sequence number";
}
break;
case kCompactPointer:
if (GetLevel(&input, &level, &msg) &&
GetInternalKey(&input, &key)) {
// we don't use compact pointers anymore,
// but we should not fail if they are still
// in manifest
} else {
if (!msg) {
msg = "compaction pointer";
}
}
break;
case kDeletedFile: {
uint64_t number = 0;
if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number)) {
deleted_files_.insert(std::make_pair(level, number));
} else {
if (!msg) {
msg = "deleted file";
}
}
break;
}
case kNewFile: {
uint64_t number = 0;
uint64_t file_size = 0;
if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
GetVarint64(&input, &file_size) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest)) {
f.fd = FileDescriptor(number, 0, file_size);
new_files_.push_back(std::make_pair(level, f));
} else {
if (!msg) {
msg = "new-file entry";
}
}
break;
}
case kNewFile2: {
uint64_t number = 0;
uint64_t file_size = 0;
SequenceNumber smallest_seqno = 0;
SequenceNumber largest_seqno = kMaxSequenceNumber;
if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
GetVarint64(&input, &file_size) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest) &&
GetVarint64(&input, &smallest_seqno) &&
GetVarint64(&input, &largest_seqno)) {
f.fd = FileDescriptor(number, 0, file_size, smallest_seqno,
largest_seqno);
new_files_.push_back(std::make_pair(level, f));
} else {
if (!msg) {
msg = "new-file2 entry";
}
}
break;
}
case kNewFile3: {
uint64_t number = 0;
uint32_t path_id = 0;
uint64_t file_size = 0;
SequenceNumber smallest_seqno = 0;
SequenceNumber largest_seqno = kMaxSequenceNumber;
if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
GetVarint32(&input, &path_id) && GetVarint64(&input, &file_size) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest) &&
GetVarint64(&input, &smallest_seqno) &&
GetVarint64(&input, &largest_seqno)) {
f.fd = FileDescriptor(number, path_id, file_size, smallest_seqno,
largest_seqno);
new_files_.push_back(std::make_pair(level, f));
} else {
if (!msg) {
msg = "new-file3 entry";
}
}
break;
}
case kNewFile4: {
msg = DecodeNewFile4From(&input);
break;
}
case kBlobFileAddition:
case kBlobFileAddition_DEPRECATED: {
BlobFileAddition blob_file_addition;
const Status s = blob_file_addition.DecodeFrom(&input);
if (!s.ok()) {
return s;
}
AddBlobFile(std::move(blob_file_addition));
break;
}
case kBlobFileGarbage:
case kBlobFileGarbage_DEPRECATED: {
BlobFileGarbage blob_file_garbage;
const Status s = blob_file_garbage.DecodeFrom(&input);
if (!s.ok()) {
return s;
}
AddBlobFileGarbage(std::move(blob_file_garbage));
break;
}
case kWalAddition: {
WalAddition wal_addition;
const Status s = wal_addition.DecodeFrom(&input);
if (!s.ok()) {
return s;
}
wal_additions_.emplace_back(std::move(wal_addition));
break;
}
case kWalAddition2: {
Slice encoded;
if (!GetLengthPrefixedSlice(&input, &encoded)) {
msg = "WalAddition not prefixed by length";
break;
}
WalAddition wal_addition;
const Status s = wal_addition.DecodeFrom(&encoded);
if (!s.ok()) {
return s;
}
wal_additions_.emplace_back(std::move(wal_addition));
break;
}
case kWalDeletion: {
WalDeletion wal_deletion;
const Status s = wal_deletion.DecodeFrom(&input);
if (!s.ok()) {
return s;
}
wal_deletion_ = std::move(wal_deletion);
break;
}
case kWalDeletion2: {
Slice encoded;
if (!GetLengthPrefixedSlice(&input, &encoded)) {
msg = "WalDeletion not prefixed by length";
break;
}
WalDeletion wal_deletion;
const Status s = wal_deletion.DecodeFrom(&encoded);
if (!s.ok()) {
return s;
}
wal_deletion_ = std::move(wal_deletion);
break;
}
case kColumnFamily:
if (!GetVarint32(&input, &column_family_)) {
if (!msg) {
msg = "set column family id";
}
}
break;
case kColumnFamilyAdd:
if (GetLengthPrefixedSlice(&input, &str)) {
is_column_family_add_ = true;
column_family_name_ = str.ToString();
} else {
if (!msg) {
msg = "column family add";
}
}
break;
case kColumnFamilyDrop:
is_column_family_drop_ = true;
break;
case kInAtomicGroup:
is_in_atomic_group_ = true;
if (!GetVarint32(&input, &remaining_entries_)) {
if (!msg) {
msg = "remaining entries";
}
}
break;
case kFullHistoryTsLow:
if (!GetLengthPrefixedSlice(&input, &str)) {
msg = "full_history_ts_low";
} else if (str.empty()) {
msg = "full_history_ts_low: empty";
} else {
full_history_ts_low_.assign(str.data(), str.size());
}
break;
default:
if (tag & kTagSafeIgnoreMask) {
// Tag from future which can be safely ignored.
// The next field must be the length of the entry.
uint32_t field_len;
if (!GetVarint32(&input, &field_len) ||
static_cast<size_t>(field_len) > input.size()) {
if (!msg) {
msg = "safely ignoreable tag length error";
}
} else {
input.remove_prefix(static_cast<size_t>(field_len));
}
} else {
msg = "unknown tag";
}
break;
}
}
if (msg == nullptr && !input.empty()) {
msg = "invalid tag";
}
Status result;
if (msg != nullptr) {
result = Status::Corruption("VersionEdit", msg);
}
return result;
}