in src/kudu/consensus/raft_consensus.cc [346:533]
Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
scoped_refptr<log::Log> log,
scoped_refptr<ITimeManager> time_manager,
ConsensusRoundHandler* round_handler,
const scoped_refptr<MetricEntity>& metric_entity,
Callback<void(const std::string& reason)> mark_dirty_clbk) {
DCHECK(metric_entity);
peer_proxy_factory_ = std::move(peer_proxy_factory);
log_ = std::move(log);
time_manager_ = std::move(time_manager);
round_handler_ = DCHECK_NOTNULL(round_handler);
mark_dirty_clbk_ = std::move(mark_dirty_clbk);
DCHECK(peer_proxy_factory_ != NULL);
DCHECK(log_ != NULL);
DCHECK(time_manager_ != NULL);
raft_log_truncation_counter_ =
metric_entity->FindOrCreateCounter(&METRIC_raft_log_truncation_counter);
term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term, CurrentTerm());
follower_memory_pressure_rejections_ =
metric_entity->FindOrCreateCounter(&METRIC_follower_memory_pressure_rejections);
num_failed_elections_metric_ =
metric_entity->FindOrCreateGauge(&METRIC_failed_elections_since_stable_leader,
failed_elections_since_stable_leader_);
METRIC_time_since_last_leader_heartbeat.InstantiateFunctionGauge(
metric_entity, Bind(&RaftConsensus::GetMillisSinceLastLeaderHeartbeat, Unretained(this)))
->AutoDetach(&metric_detacher_);
raft_proxy_num_requests_received_ =
metric_entity->FindOrCreateCounter(&METRIC_raft_proxy_num_requests_received);
raft_proxy_num_requests_success_ =
metric_entity->FindOrCreateCounter(&METRIC_raft_proxy_num_requests_success);
raft_proxy_num_requests_unknown_dest_ =
metric_entity->FindOrCreateCounter(&METRIC_raft_proxy_num_requests_unknown_dest);
raft_proxy_num_requests_log_read_timeout_ =
metric_entity->FindOrCreateCounter(&METRIC_raft_proxy_num_requests_log_read_timeout);
raft_proxy_num_requests_hops_remaining_exhausted_ =
metric_entity->FindOrCreateCounter(&METRIC_raft_proxy_num_requests_hops_remaining_exhausted);
// A single Raft thread pool token is shared between RaftConsensus and
// PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
// raw pointer to the token, to emphasize that RaftConsensus is responsible
// for destroying the token.
raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
// The message queue that keeps track of which operations need to be replicated
// where.
//
// Note: the message queue receives a dedicated Raft thread pool token so that
// its submissions don't block other submissions by RaftConsensus (such as
// heartbeat processing).
//
// TODO(adar): the token is SERIAL to match the previous single-thread
// observer pool behavior, but CONCURRENT may be safe here.
unique_ptr<PeerMessageQueue> queue(new PeerMessageQueue(
metric_entity,
log_,
time_manager_,
local_peer_pb_,
routing_table_container_,
options_.tablet_id,
raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
info.last_id,
info.last_committed_id));
// Proxy failure threshold is set to "2 * leader failure timeout" which
// is roughly equivalent to 3000 ms
queue->SetProxyFailureThreshold(
2 * MinimumElectionTimeout().ToMilliseconds());
// A manager for the set of peers that actually send the operations both remotely
// and to the local wal.
unique_ptr<PeerManager> peer_manager(new PeerManager(options_.tablet_id,
peer_uuid(),
peer_proxy_factory_.get(),
queue.get(),
raft_pool_token_.get(),
log_));
unique_ptr<PendingRounds> pending(new PendingRounds(LogPrefixThreadSafe(), time_manager_));
// Capture a weak_ptr reference into the functor so it can safely handle
// outliving the consensus instance.
weak_ptr<RaftConsensus> w = shared_from_this();
failure_detector_ = PeriodicTimer::Create(
peer_proxy_factory_->messenger(),
[w]() {
if (auto consensus = w.lock()) {
consensus->ReportFailureDetected();
}
},
MinimumElectionTimeout());
PeriodicTimer::Options opts;
opts.one_shot = true;
transfer_period_timer_ = PeriodicTimer::Create(
peer_proxy_factory_->messenger(),
[w]() {
if (auto consensus = w.lock()) {
consensus->EndLeaderTransferPeriod();
}
},
MinimumElectionTimeout(),
opts);
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): "
<< State_Name(state_);
queue_ = std::move(queue);
peer_manager_ = std::move(peer_manager);
pending_ = std::move(pending);
ClearLeaderUnlocked();
// Our last persisted term can be higher than the last persisted operation
// (i.e. if we called an election) but reverse should never happen.
if (info.last_id.term() > CurrentTermUnlocked()) {
return Status::Corruption(Substitute("Unable to start RaftConsensus: "
"The last op in the WAL with id $0 has a term ($1) that is greater "
"than the latest recorded term, which is $2",
OpIdToString(info.last_id),
info.last_id.term(),
CurrentTermUnlocked()));
}
// Append any uncommitted replicate messages found during log replay to the queue.
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
<< info.orphaned_replicates.size()
<< " pending transactions. Active config: "
<< SecureShortDebugString(cmeta_->ActiveConfig());
for (ReplicateMsg* replicate : info.orphaned_replicates) {
ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate));
RETURN_NOT_OK(StartFollowerTransactionUnlocked(replicate_ptr));
}
// Set the initial committed opid for the PendingRounds only after
// appending any uncommitted replicate messages to the queue.
pending_->SetInitialCommittedOpId(info.last_committed_id);
// If this is the first term expire the FD immediately so that we have a
// fast first election, otherwise we just let the timer expire normally.
boost::optional<MonoDelta> fd_initial_delta;
if (CurrentTermUnlocked() == 0) {
// The failure detector is initialized to a low value to trigger an early
// election (unless someone else requested a vote from us first, which
// resets the election timer).
//
// We do it this way instead of immediately running an election to get a
// higher likelihood of enough servers being available when the first one
// attempts an election to avoid multiple election cycles on startup,
// while keeping that "waiting period" random.
if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Consensus starting up: Expiring failure detector timer "
"to make a prompt election more likely";
fd_initial_delta = MonoDelta::FromMilliseconds(
rng_.Uniform(FLAGS_raft_heartbeat_interval_ms));
}
}
// Now assume non-leader replica duties.
RETURN_NOT_OK(BecomeReplicaUnlocked(fd_initial_delta));
SetStateUnlocked(kRunning);
}
if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
LOG_WITH_PREFIX(INFO) << "Only one voter in the Raft config. Triggering election immediately";
RETURN_NOT_OK(
StartElection(NORMAL_ELECTION, {INITIAL_SINGLE_NODE_ELECTION,
std::chrono::system_clock::now()}));
}
// Report become visible to the Master.
MarkDirty("RaftConsensus started");
return Status::OK();
}