in src/consensus/aft/raft.h [1081:1286]
void recv_append_entries(
const ccf::NodeId& from,
AppendEntries r,
const uint8_t* data,
size_t size)
{
std::unique_lock<std::mutex> guard(state->lock);
LOG_DEBUG_FMT(
"Received append entries: {}.{} to {}.{} (from {} in term {})",
r.prev_term,
r.prev_idx,
r.term_of_idx,
r.idx,
from,
r.term);
// Don't check that the sender node ID is valid. Accept anything that
// passes the integrity check. This way, entries containing dynamic
// topology changes that include adding this new leader can be accepted.
// First, check append entries term against our own term, becoming
// follower if necessary
if (
state->current_view == r.term &&
leadership_state == kv::LeadershipState::Candidate)
{
become_aware_of_new_term(r.term);
}
else if (state->current_view < r.term)
{
become_aware_of_new_term(r.term);
}
else if (state->current_view > r.term)
{
// Reply false, since our term is later than the received term.
LOG_INFO_FMT(
"Recv append entries to {} from {} but our term is later ({} > {})",
state->my_node_id,
from,
state->current_view,
r.term);
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
return;
}
// Second, check term consistency with the entries we have so far
const auto prev_term = get_term_internal(r.prev_idx);
if (prev_term != r.prev_term)
{
LOG_DEBUG_FMT(
"Previous term for {} should be {}", r.prev_idx, prev_term);
// Reply false if the log doesn't contain an entry at r.prev_idx
// whose term is r.prev_term.
if (prev_term == 0)
{
LOG_DEBUG_FMT(
"Recv append entries to {} from {} but our log does not yet "
"contain index {}",
state->my_node_id,
from,
r.prev_idx);
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
}
else
{
LOG_DEBUG_FMT(
"Recv append entries to {} from {} but our log at {} has the wrong "
"previous term (ours: {}, theirs: {})",
state->my_node_id,
from,
r.prev_idx,
prev_term,
r.prev_term);
const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
send_append_entries_response(
from, AppendEntriesResponseType::FAIL, rejected_tx);
}
return;
}
// Then check if those append entries extend past our retirement
if (is_retired() && retirement_phase >= kv::RetirementPhase::Completed)
{
assert(retirement_committable_idx.has_value());
if (r.idx > retirement_committable_idx)
{
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
return;
}
}
// If the terms match up, it is sufficient to convince us that the sender
// is leader in our term
restart_election_timeout();
if (!leader_id.has_value() || leader_id.value() != from)
{
leader_id = from;
LOG_DEBUG_FMT(
"Node {} thinks leader is {}", state->my_node_id, leader_id.value());
}
// Third, check index consistency, making sure entries are not in the past
// or in the future
if (
consensus_type == ConsensusType::CFT && r.prev_idx < state->commit_idx)
{
LOG_DEBUG_FMT(
"Recv append entries to {} from {} but prev_idx ({}) < commit_idx "
"({})",
state->my_node_id,
from,
r.prev_idx,
state->commit_idx);
return;
}
else if (r.prev_idx > state->last_idx)
{
LOG_DEBUG_FMT(
"Recv append entries to {} from {} but prev_idx ({}) > last_idx ({})",
state->my_node_id,
from,
r.prev_idx,
state->last_idx);
return;
}
LOG_DEBUG_FMT(
"Recv append entries to {} from {} for index {} and previous index {}",
state->my_node_id,
from,
r.idx,
r.prev_idx);
if (is_new_follower)
{
if (state->last_idx > r.prev_idx)
{
LOG_DEBUG_FMT(
"New follower received first append entries with mismatch - "
"rolling back from {} to {}",
state->last_idx,
r.prev_idx);
auto rollback_level = r.prev_idx;
rollback(rollback_level);
}
else
{
LOG_DEBUG_FMT(
"New follower has no conflict with prev_idx {}", r.prev_idx);
}
is_new_follower = false;
}
std::vector<
std::tuple<std::unique_ptr<kv::AbstractExecutionWrapper>, kv::Version>>
append_entries;
// Finally, deserialise each entry in the batch
for (Index i = r.prev_idx + 1; i <= r.idx; i++)
{
if (i <= state->last_idx)
{
// If the current entry has already been deserialised, skip the
// payload for that entry
ledger->skip_entry(data, size);
continue;
}
LOG_DEBUG_FMT("Replicating on follower {}: {}", state->my_node_id, i);
std::vector<uint8_t> entry;
try
{
entry = ledger->get_entry(data, size);
}
catch (const std::logic_error& e)
{
// This should only fail if there is malformed data.
LOG_FAIL_FMT(
"Recv append entries to {} from {} but the data is malformed: {}",
state->my_node_id,
from,
e.what());
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
return;
}
kv::TxID expected{r.term_of_idx, i};
auto ds = store->apply(entry, consensus_type, public_only, expected);
if (ds == nullptr)
{
LOG_FAIL_FMT(
"Recv append entries to {} from {} but the entry could not be "
"deserialised",
state->my_node_id,
from);
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
return;
}
append_entries.push_back(std::make_tuple(std::move(ds), i));
}
execute_append_entries_sync(
std::move(append_entries), from, std::move(r));
}