CommitResult commit()

in src/kv/store.h [857:1000]


    CommitResult commit(
      const TxID& txid,
      std::unique_ptr<PendingTx> pending_tx,
      bool globally_committable) override
    {
      auto c = get_consensus();
      if (!c)
      {
        return CommitResult::SUCCESS;
      }

      std::lock_guard<std::mutex> cguard(commit_lock);

      LOG_DEBUG_FMT(
        "Store::commit {}{}",
        txid.version,
        (globally_committable ? " globally_committable" : ""));

      BatchVector batch;
      Version previous_last_replicated = 0;
      Version next_last_replicated = 0;
      Version previous_rollback_count = 0;
      ccf::View replication_view = 0;

      {
        std::lock_guard<std::mutex> vguard(version_lock);
        if (txid.term != term_of_next_version && consensus->is_primary())
        {
          // This can happen when a transaction started before a view change,
          // but tries to commit after the view change is complete.
          LOG_DEBUG_FMT(
            "Want to commit for term {} but term is {}",
            txid.term,
            term_of_next_version);

          return CommitResult::FAIL_NO_REPLICATE;
        }

        if (globally_committable && txid.version > last_committable)
        {
          last_committable = txid.version;
        }

        pending_txs.insert(
          {txid.version,
           std::make_pair(std::move(pending_tx), globally_committable)});

        LOG_TRACE_FMT("Inserting pending tx at {}", txid.version);

        auto h = get_history();
        auto c = get_consensus();

        for (Version offset = 1; true; ++offset)
        {
          auto search = pending_txs.find(last_replicated + offset);
          if (search == pending_txs.end())
          {
            LOG_TRACE_FMT(
              "Couldn't find {} = {} + {}, giving up on batch while committing "
              "{}.{}",
              last_replicated + offset,
              last_replicated,
              offset,
              txid.term,
              txid.version);
            break;
          }

          auto& [pending_tx_, committable_] = search->second;
          auto [success_, data_, claims_digest_, hooks_] = pending_tx_->call();
          auto data_shared =
            std::make_shared<std::vector<uint8_t>>(std::move(data_));
          auto hooks_shared =
            std::make_shared<kv::ConsensusHookPtrs>(std::move(hooks_));

          // NB: this cannot happen currently. Regular Tx only make it here if
          // they did succeed, and signatures cannot conflict because they
          // execute in order with a read_version that's version - 1, so even
          // two contiguous signatures are fine
          if (success_ != CommitResult::SUCCESS)
          {
            LOG_DEBUG_FMT("Failed Tx commit {}", last_replicated + offset);
          }

          if (h)
          {
            if (claims_digest_.empty())
            {
              h->append(*data_shared);
            }
            else
            {
              h->append_entry(
                ccf::entry_leaf(*data_shared, claims_digest_.value()));
            }
          }

          LOG_DEBUG_FMT(
            "Batching {} ({}) during commit of {}.{}",
            last_replicated + offset,
            data_shared->size(),
            txid.term,
            txid.version);

          batch.emplace_back(
            last_replicated + offset, data_shared, committable_, hooks_shared);
          pending_txs.erase(search);
        }

        if (batch.size() == 0)
        {
          return CommitResult::SUCCESS;
        }

        previous_rollback_count = rollback_count;
        previous_last_replicated = last_replicated;
        next_last_replicated = last_replicated + batch.size();

        replication_view = term_of_next_version;

        if (consensus->type() == ConsensusType::BFT && consensus->is_backup())
        {
          last_replicated = next_last_replicated;
        }
      }

      if (c->replicate(batch, replication_view))
      {
        std::lock_guard<std::mutex> vguard(version_lock);
        if (
          last_replicated == previous_last_replicated &&
          previous_rollback_count == rollback_count &&
          !(consensus->type() == ConsensusType::BFT && consensus->is_backup()))
        {
          last_replicated = next_last_replicated;
        }
        return CommitResult::SUCCESS;
      }
      else
      {
        LOG_DEBUG_FMT("Failed to replicate");
        return CommitResult::FAIL_NO_REPLICATE;
      }
    }