in src/kv/store.h [857:1000]
CommitResult commit(
const TxID& txid,
std::unique_ptr<PendingTx> pending_tx,
bool globally_committable) override
{
auto c = get_consensus();
if (!c)
{
return CommitResult::SUCCESS;
}
std::lock_guard<std::mutex> cguard(commit_lock);
LOG_DEBUG_FMT(
"Store::commit {}{}",
txid.version,
(globally_committable ? " globally_committable" : ""));
BatchVector batch;
Version previous_last_replicated = 0;
Version next_last_replicated = 0;
Version previous_rollback_count = 0;
ccf::View replication_view = 0;
{
std::lock_guard<std::mutex> vguard(version_lock);
if (txid.term != term_of_next_version && consensus->is_primary())
{
// This can happen when a transaction started before a view change,
// but tries to commit after the view change is complete.
LOG_DEBUG_FMT(
"Want to commit for term {} but term is {}",
txid.term,
term_of_next_version);
return CommitResult::FAIL_NO_REPLICATE;
}
if (globally_committable && txid.version > last_committable)
{
last_committable = txid.version;
}
pending_txs.insert(
{txid.version,
std::make_pair(std::move(pending_tx), globally_committable)});
LOG_TRACE_FMT("Inserting pending tx at {}", txid.version);
auto h = get_history();
auto c = get_consensus();
for (Version offset = 1; true; ++offset)
{
auto search = pending_txs.find(last_replicated + offset);
if (search == pending_txs.end())
{
LOG_TRACE_FMT(
"Couldn't find {} = {} + {}, giving up on batch while committing "
"{}.{}",
last_replicated + offset,
last_replicated,
offset,
txid.term,
txid.version);
break;
}
auto& [pending_tx_, committable_] = search->second;
auto [success_, data_, claims_digest_, hooks_] = pending_tx_->call();
auto data_shared =
std::make_shared<std::vector<uint8_t>>(std::move(data_));
auto hooks_shared =
std::make_shared<kv::ConsensusHookPtrs>(std::move(hooks_));
// NB: this cannot happen currently. Regular Tx only make it here if
// they did succeed, and signatures cannot conflict because they
// execute in order with a read_version that's version - 1, so even
// two contiguous signatures are fine
if (success_ != CommitResult::SUCCESS)
{
LOG_DEBUG_FMT("Failed Tx commit {}", last_replicated + offset);
}
if (h)
{
if (claims_digest_.empty())
{
h->append(*data_shared);
}
else
{
h->append_entry(
ccf::entry_leaf(*data_shared, claims_digest_.value()));
}
}
LOG_DEBUG_FMT(
"Batching {} ({}) during commit of {}.{}",
last_replicated + offset,
data_shared->size(),
txid.term,
txid.version);
batch.emplace_back(
last_replicated + offset, data_shared, committable_, hooks_shared);
pending_txs.erase(search);
}
if (batch.size() == 0)
{
return CommitResult::SUCCESS;
}
previous_rollback_count = rollback_count;
previous_last_replicated = last_replicated;
next_last_replicated = last_replicated + batch.size();
replication_view = term_of_next_version;
if (consensus->type() == ConsensusType::BFT && consensus->is_backup())
{
last_replicated = next_last_replicated;
}
}
if (c->replicate(batch, replication_view))
{
std::lock_guard<std::mutex> vguard(version_lock);
if (
last_replicated == previous_last_replicated &&
previous_rollback_count == rollback_count &&
!(consensus->type() == ConsensusType::BFT && consensus->is_backup()))
{
last_replicated = next_last_replicated;
}
return CommitResult::SUCCESS;
}
else
{
LOG_DEBUG_FMT("Failed to replicate");
return CommitResult::FAIL_NO_REPLICATE;
}
}