Status RaftConsensus::Start()

in src/kudu/consensus/raft_consensus.cc [346:533]


Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                            gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
                            scoped_refptr<log::Log> log,
                            scoped_refptr<ITimeManager> time_manager,
                            ConsensusRoundHandler* round_handler,
                            const scoped_refptr<MetricEntity>& metric_entity,
                            Callback<void(const std::string& reason)> mark_dirty_clbk) {
  DCHECK(metric_entity);

  peer_proxy_factory_ = std::move(peer_proxy_factory);
  log_ = std::move(log);
  time_manager_ = std::move(time_manager);

  round_handler_ = DCHECK_NOTNULL(round_handler);
  mark_dirty_clbk_ = std::move(mark_dirty_clbk);

  DCHECK(peer_proxy_factory_ != NULL);
  DCHECK(log_ != NULL);
  DCHECK(time_manager_ != NULL);

  raft_log_truncation_counter_ =
    metric_entity->FindOrCreateCounter(&METRIC_raft_log_truncation_counter);

  term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term, CurrentTerm());
  follower_memory_pressure_rejections_ =
      metric_entity->FindOrCreateCounter(&METRIC_follower_memory_pressure_rejections);

  num_failed_elections_metric_ =
      metric_entity->FindOrCreateGauge(&METRIC_failed_elections_since_stable_leader,
                                       failed_elections_since_stable_leader_);

  METRIC_time_since_last_leader_heartbeat.InstantiateFunctionGauge(
    metric_entity, Bind(&RaftConsensus::GetMillisSinceLastLeaderHeartbeat, Unretained(this)))
    ->AutoDetach(&metric_detacher_);

  raft_proxy_num_requests_received_ =
      metric_entity->FindOrCreateCounter(&METRIC_raft_proxy_num_requests_received);
  raft_proxy_num_requests_success_ =
      metric_entity->FindOrCreateCounter(&METRIC_raft_proxy_num_requests_success);
  raft_proxy_num_requests_unknown_dest_ =
      metric_entity->FindOrCreateCounter(&METRIC_raft_proxy_num_requests_unknown_dest);
  raft_proxy_num_requests_log_read_timeout_ =
      metric_entity->FindOrCreateCounter(&METRIC_raft_proxy_num_requests_log_read_timeout);
  raft_proxy_num_requests_hops_remaining_exhausted_ =
      metric_entity->FindOrCreateCounter(&METRIC_raft_proxy_num_requests_hops_remaining_exhausted);

  // A single Raft thread pool token is shared between RaftConsensus and
  // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
  // raw pointer to the token, to emphasize that RaftConsensus is responsible
  // for destroying the token.
  raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);

  // The message queue that keeps track of which operations need to be replicated
  // where.
  //
  // Note: the message queue receives a dedicated Raft thread pool token so that
  // its submissions don't block other submissions by RaftConsensus (such as
  // heartbeat processing).
  //
  // TODO(adar): the token is SERIAL to match the previous single-thread
  // observer pool behavior, but CONCURRENT may be safe here.
  unique_ptr<PeerMessageQueue> queue(new PeerMessageQueue(
      metric_entity,
      log_,
      time_manager_,
      local_peer_pb_,
      routing_table_container_,
      options_.tablet_id,
      raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
      info.last_id,
      info.last_committed_id));

  // Proxy failure threshold is set to "2 * leader failure timeout" which
  // is roughly equivalent to 3000 ms
  queue->SetProxyFailureThreshold(
      2 * MinimumElectionTimeout().ToMilliseconds());

  // A manager for the set of peers that actually send the operations both remotely
  // and to the local wal.
  unique_ptr<PeerManager> peer_manager(new PeerManager(options_.tablet_id,
                                                       peer_uuid(),
                                                       peer_proxy_factory_.get(),
                                                       queue.get(),
                                                       raft_pool_token_.get(),
                                                       log_));

  unique_ptr<PendingRounds> pending(new PendingRounds(LogPrefixThreadSafe(), time_manager_));

  // Capture a weak_ptr reference into the functor so it can safely handle
  // outliving the consensus instance.
  weak_ptr<RaftConsensus> w = shared_from_this();
  failure_detector_ = PeriodicTimer::Create(
      peer_proxy_factory_->messenger(),
      [w]() {
        if (auto consensus = w.lock()) {
          consensus->ReportFailureDetected();
        }
      },
      MinimumElectionTimeout());

  PeriodicTimer::Options opts;
  opts.one_shot = true;
  transfer_period_timer_ = PeriodicTimer::Create(
      peer_proxy_factory_->messenger(),
      [w]() {
        if (auto consensus = w.lock()) {
          consensus->EndLeaderTransferPeriod();
        }
      },
      MinimumElectionTimeout(),
      opts);

  {
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): "
                                   << State_Name(state_);

    queue_ = std::move(queue);
    peer_manager_ = std::move(peer_manager);
    pending_ = std::move(pending);

    ClearLeaderUnlocked();

    // Our last persisted term can be higher than the last persisted operation
    // (i.e. if we called an election) but reverse should never happen.
    if (info.last_id.term() > CurrentTermUnlocked()) {
      return Status::Corruption(Substitute("Unable to start RaftConsensus: "
          "The last op in the WAL with id $0 has a term ($1) that is greater "
          "than the latest recorded term, which is $2",
          OpIdToString(info.last_id),
          info.last_id.term(),
          CurrentTermUnlocked()));
    }

    // Append any uncommitted replicate messages found during log replay to the queue.
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
                                   << info.orphaned_replicates.size()
                                   << " pending transactions. Active config: "
                                   << SecureShortDebugString(cmeta_->ActiveConfig());
    for (ReplicateMsg* replicate : info.orphaned_replicates) {
      ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate));
      RETURN_NOT_OK(StartFollowerTransactionUnlocked(replicate_ptr));
    }

    // Set the initial committed opid for the PendingRounds only after
    // appending any uncommitted replicate messages to the queue.
    pending_->SetInitialCommittedOpId(info.last_committed_id);

    // If this is the first term expire the FD immediately so that we have a
    // fast first election, otherwise we just let the timer expire normally.
    boost::optional<MonoDelta> fd_initial_delta;
    if (CurrentTermUnlocked() == 0) {
      // The failure detector is initialized to a low value to trigger an early
      // election (unless someone else requested a vote from us first, which
      // resets the election timer).
      //
      // We do it this way instead of immediately running an election to get a
      // higher likelihood of enough servers being available when the first one
      // attempts an election to avoid multiple election cycles on startup,
      // while keeping that "waiting period" random.
      if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Consensus starting up: Expiring failure detector timer "
                                          "to make a prompt election more likely";
        fd_initial_delta = MonoDelta::FromMilliseconds(
            rng_.Uniform(FLAGS_raft_heartbeat_interval_ms));
      }
    }

    // Now assume non-leader replica duties.
    RETURN_NOT_OK(BecomeReplicaUnlocked(fd_initial_delta));

    SetStateUnlocked(kRunning);
  }

  if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
    LOG_WITH_PREFIX(INFO) << "Only one voter in the Raft config. Triggering election immediately";
    RETURN_NOT_OK(
        StartElection(NORMAL_ELECTION, {INITIAL_SINGLE_NODE_ELECTION,
                                        std::chrono::system_clock::now()}));

  }

  // Report become visible to the Master.
  MarkDirty("RaftConsensus started");

  return Status::OK();
}