void recv_append_entries()

in src/consensus/aft/raft.h [1081:1286]


    void recv_append_entries(
      const ccf::NodeId& from,
      AppendEntries r,
      const uint8_t* data,
      size_t size)
    {
      std::unique_lock<std::mutex> guard(state->lock);

      LOG_DEBUG_FMT(
        "Received append entries: {}.{} to {}.{} (from {} in term {})",
        r.prev_term,
        r.prev_idx,
        r.term_of_idx,
        r.idx,
        from,
        r.term);

      // Don't check that the sender node ID is valid. Accept anything that
      // passes the integrity check. This way, entries containing dynamic
      // topology changes that include adding this new leader can be accepted.

      // First, check append entries term against our own term, becoming
      // follower if necessary
      if (
        state->current_view == r.term &&
        leadership_state == kv::LeadershipState::Candidate)
      {
        become_aware_of_new_term(r.term);
      }
      else if (state->current_view < r.term)
      {
        become_aware_of_new_term(r.term);
      }
      else if (state->current_view > r.term)
      {
        // Reply false, since our term is later than the received term.
        LOG_INFO_FMT(
          "Recv append entries to {} from {} but our term is later ({} > {})",
          state->my_node_id,
          from,
          state->current_view,
          r.term);
        send_append_entries_response(from, AppendEntriesResponseType::FAIL);
        return;
      }

      // Second, check term consistency with the entries we have so far
      const auto prev_term = get_term_internal(r.prev_idx);
      if (prev_term != r.prev_term)
      {
        LOG_DEBUG_FMT(
          "Previous term for {} should be {}", r.prev_idx, prev_term);

        // Reply false if the log doesn't contain an entry at r.prev_idx
        // whose term is r.prev_term.
        if (prev_term == 0)
        {
          LOG_DEBUG_FMT(
            "Recv append entries to {} from {} but our log does not yet "
            "contain index {}",
            state->my_node_id,
            from,
            r.prev_idx);
          send_append_entries_response(from, AppendEntriesResponseType::FAIL);
        }
        else
        {
          LOG_DEBUG_FMT(
            "Recv append entries to {} from {} but our log at {} has the wrong "
            "previous term (ours: {}, theirs: {})",
            state->my_node_id,
            from,
            r.prev_idx,
            prev_term,
            r.prev_term);
          const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
          send_append_entries_response(
            from, AppendEntriesResponseType::FAIL, rejected_tx);
        }
        return;
      }

      // Then check if those append entries extend past our retirement
      if (is_retired() && retirement_phase >= kv::RetirementPhase::Completed)
      {
        assert(retirement_committable_idx.has_value());
        if (r.idx > retirement_committable_idx)
        {
          send_append_entries_response(from, AppendEntriesResponseType::FAIL);
          return;
        }
      }

      // If the terms match up, it is sufficient to convince us that the sender
      // is leader in our term
      restart_election_timeout();
      if (!leader_id.has_value() || leader_id.value() != from)
      {
        leader_id = from;
        LOG_DEBUG_FMT(
          "Node {} thinks leader is {}", state->my_node_id, leader_id.value());
      }

      // Third, check index consistency, making sure entries are not in the past
      // or in the future
      if (
        consensus_type == ConsensusType::CFT && r.prev_idx < state->commit_idx)
      {
        LOG_DEBUG_FMT(
          "Recv append entries to {} from {} but prev_idx ({}) < commit_idx "
          "({})",
          state->my_node_id,
          from,
          r.prev_idx,
          state->commit_idx);
        return;
      }
      else if (r.prev_idx > state->last_idx)
      {
        LOG_DEBUG_FMT(
          "Recv append entries to {} from {} but prev_idx ({}) > last_idx ({})",
          state->my_node_id,
          from,
          r.prev_idx,
          state->last_idx);
        return;
      }

      LOG_DEBUG_FMT(
        "Recv append entries to {} from {} for index {} and previous index {}",
        state->my_node_id,
        from,
        r.idx,
        r.prev_idx);

      if (is_new_follower)
      {
        if (state->last_idx > r.prev_idx)
        {
          LOG_DEBUG_FMT(
            "New follower received first append entries with mismatch - "
            "rolling back from {} to {}",
            state->last_idx,
            r.prev_idx);
          auto rollback_level = r.prev_idx;
          rollback(rollback_level);
        }
        else
        {
          LOG_DEBUG_FMT(
            "New follower has no conflict with prev_idx {}", r.prev_idx);
        }
        is_new_follower = false;
      }

      std::vector<
        std::tuple<std::unique_ptr<kv::AbstractExecutionWrapper>, kv::Version>>
        append_entries;
      // Finally, deserialise each entry in the batch
      for (Index i = r.prev_idx + 1; i <= r.idx; i++)
      {
        if (i <= state->last_idx)
        {
          // If the current entry has already been deserialised, skip the
          // payload for that entry
          ledger->skip_entry(data, size);
          continue;
        }

        LOG_DEBUG_FMT("Replicating on follower {}: {}", state->my_node_id, i);

        std::vector<uint8_t> entry;
        try
        {
          entry = ledger->get_entry(data, size);
        }
        catch (const std::logic_error& e)
        {
          // This should only fail if there is malformed data.
          LOG_FAIL_FMT(
            "Recv append entries to {} from {} but the data is malformed: {}",
            state->my_node_id,
            from,
            e.what());
          send_append_entries_response(from, AppendEntriesResponseType::FAIL);
          return;
        }

        kv::TxID expected{r.term_of_idx, i};
        auto ds = store->apply(entry, consensus_type, public_only, expected);
        if (ds == nullptr)
        {
          LOG_FAIL_FMT(
            "Recv append entries to {} from {} but the entry could not be "
            "deserialised",
            state->my_node_id,
            from);
          send_append_entries_response(from, AppendEntriesResponseType::FAIL);
          return;
        }
        append_entries.push_back(std::make_tuple(std::move(ds), i));
      }

      execute_append_entries_sync(
        std::move(append_entries), from, std::move(r));
    }