in db/write_batch.cc [460:679]
Status WriteBatchInternal::Iterate(const WriteBatch* wb,
WriteBatch::Handler* handler, size_t begin,
size_t end) {
if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
return Status::Corruption("Invalid start/end bounds for Iterate");
}
assert(begin <= end);
Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
bool whole_batch =
(begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
Slice key, value, blob, xid;
// Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
// the batch boundary symbols otherwise we would mis-count the number of
// batches. We do that by checking whether the accumulated batch is empty
// before seeing the next Noop.
bool empty_batch = true;
uint32_t found = 0;
Status s;
char tag = 0;
uint32_t column_family = 0; // default
bool last_was_try_again = false;
bool handler_continue = true;
while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
handler_continue = handler->Continue();
if (!handler_continue) {
break;
}
if (LIKELY(!s.IsTryAgain())) {
last_was_try_again = false;
tag = 0;
column_family = 0; // default
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid);
if (!s.ok()) {
return s;
}
} else {
assert(s.IsTryAgain());
assert(!last_was_try_again); // to detect infinite loop bugs
if (UNLIKELY(last_was_try_again)) {
return Status::Corruption(
"two consecutive TryAgain in WriteBatch handler; this is either a "
"software bug or data corruption.");
}
last_was_try_again = true;
s = Status::OK();
}
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
s = handler->SingleDeleteCF(column_family, key);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyRangeDeletion:
case kTypeRangeDeletion:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
s = handler->DeleteRangeCF(column_family, key, value);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
s = handler->MergeCF(column_family, key, value);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyBlobIndex:
case kTypeBlobIndex:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
s = handler->PutBlobIndexCF(column_family, key, value);
if (LIKELY(s.ok())) {
found++;
}
break;
case kTypeLogData:
handler->LogData(blob);
// A batch might have nothing but LogData. It is still a batch.
empty_batch = false;
break;
case kTypeBeginPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
s = handler->MarkBeginPrepare();
assert(s.ok());
empty_batch = false;
if (!handler->WriteAfterCommit()) {
s = Status::NotSupported(
"WriteCommitted txn tag when write_after_commit_ is disabled (in "
"WritePrepared/WriteUnprepared mode). If it is not due to "
"corruption, the WAL must be emptied before changing the "
"WritePolicy.");
}
if (handler->WriteBeforePrepare()) {
s = Status::NotSupported(
"WriteCommitted txn tag when write_before_prepare_ is enabled "
"(in WriteUnprepared mode). If it is not due to corruption, the "
"WAL must be emptied before changing the WritePolicy.");
}
break;
case kTypeBeginPersistedPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
s = handler->MarkBeginPrepare();
assert(s.ok());
empty_batch = false;
if (handler->WriteAfterCommit()) {
s = Status::NotSupported(
"WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
"is enabled (in default WriteCommitted mode). If it is not due "
"to corruption, the WAL must be emptied before changing the "
"WritePolicy.");
}
break;
case kTypeBeginUnprepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
s = handler->MarkBeginPrepare(true /* unprepared */);
assert(s.ok());
empty_batch = false;
if (handler->WriteAfterCommit()) {
s = Status::NotSupported(
"WriteUnprepared txn tag when write_after_commit_ is enabled (in "
"default WriteCommitted mode). If it is not due to corruption, "
"the WAL must be emptied before changing the WritePolicy.");
}
if (!handler->WriteBeforePrepare()) {
s = Status::NotSupported(
"WriteUnprepared txn tag when write_before_prepare_ is disabled "
"(in WriteCommitted/WritePrepared mode). If it is not due to "
"corruption, the WAL must be emptied before changing the "
"WritePolicy.");
}
break;
case kTypeEndPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
s = handler->MarkEndPrepare(xid);
assert(s.ok());
empty_batch = true;
break;
case kTypeCommitXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
s = handler->MarkCommit(xid);
assert(s.ok());
empty_batch = true;
break;
case kTypeCommitXIDAndTimestamp:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
// key stores the commit timestamp.
assert(!key.empty());
s = handler->MarkCommitWithTimestamp(xid, key);
if (LIKELY(s.ok())) {
empty_batch = true;
}
break;
case kTypeRollbackXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
s = handler->MarkRollback(xid);
assert(s.ok());
empty_batch = true;
break;
case kTypeNoop:
s = handler->MarkNoop(empty_batch);
assert(s.ok());
empty_batch = true;
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (!s.ok()) {
return s;
}
if (handler_continue && whole_batch &&
found != WriteBatchInternal::Count(wb)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}