Status TSTabletManager::Init()

in src/kudu/tserver/ts_tablet_manager.cc [446:623]


Status TSTabletManager::Init(Timer* start_tablets,
                             std::atomic<int>* tablets_processed,
                             std::atomic<int>* tablets_total) {
  CHECK_EQ(state(), MANAGER_INITIALIZING);

  // Start the tablet copy thread pool. We set a max queue size of 0 so that if
  // the number of requests exceeds the number of threads, a
  // SERVICE_UNAVAILABLE error may be returned to the remote caller.
  RETURN_NOT_OK(ThreadPoolBuilder("tablet-copy")
                .set_max_queue_size(0)
                .set_max_threads(FLAGS_num_tablets_to_copy_simultaneously)
                .Build(&tablet_copy_pool_));

  RETURN_NOT_OK(ThreadPoolBuilder("txn-commit")
                .set_max_threads(FLAGS_txn_commit_pool_num_threads)
                .Build(&txn_commit_pool_));

  // Start the threadpools we'll use to open and delete tablets.
  // This has to be done in Init() instead of the constructor, since the
  // FsManager isn't initialized until this point.
  int max_open_threads = FLAGS_num_tablets_to_open_simultaneously;
  if (max_open_threads == 0) {
    // Default to the number of disks.
    max_open_threads = fs_manager_->GetDataRootDirs().size();
  }
  RETURN_NOT_OK(ThreadPoolBuilder("tablet-open")
                .set_max_threads(max_open_threads)
                .Build(&open_tablet_pool_));

  int max_delete_threads = FLAGS_num_tablets_to_delete_simultaneously;
  if (max_delete_threads == 0) {
    // Default to the number of disks.
    max_delete_threads = fs_manager_->GetDataRootDirs().size();
  }
  RETURN_NOT_OK(ThreadPoolBuilder("tablet-delete")
                .set_max_threads(max_delete_threads)
                .Build(&delete_tablet_pool_));

  int max_reload_threads = FLAGS_num_txn_status_tablets_to_reload_simultaneously;
  if (max_reload_threads == 0) {
    // Default to the number of data directories.
    max_reload_threads = fs_manager_->GetDataRootDirs().size();
  }
  RETURN_NOT_OK(ThreadPoolBuilder("txn-status-tablet-reload")
                .set_max_threads(max_reload_threads)
                .Build(&reload_txn_status_tablet_pool_));

  RETURN_NOT_OK(
      ThreadPoolBuilder("txn-participant-registration")
      .set_max_threads(FLAGS_txn_participant_registration_pool_num_threads)
      .Build(&txn_participant_registration_pool_));

  // TODO(aserbin): if better parallelism is needed to serve higher txn volume,
  //                consider using multiple threads in this pool and schedule
  //                per-tablet-replica clean-up tasks via threadpool serial
  //                tokens to make sure no more than one clean-up task
  //                is running against a txn status tablet replica.
  RETURN_NOT_OK(ThreadPoolBuilder("txn-status-manager")
                .set_max_threads(1)
                .set_max_queue_size(0)
                .Build(&txn_status_manager_pool_));
  RETURN_NOT_OK(txn_status_manager_pool_->Submit([this]() {
    this->TxnStalenessTrackerTask();
  }));

  start_tablets->Start();

  // Search for tablets in the metadata dir.
  vector<string> tablet_ids;
  RETURN_NOT_OK(fs_manager_->ListTabletIds(&tablet_ids));

  InitLocalRaftPeerPB();

  vector<scoped_refptr<TabletMetadata>> metas(tablet_ids.size());

  // First, load all of the tablet metadata. We do this before we start
  // submitting the actual OpenTablet() tasks so that we don't have to compete
  // for disk resources, etc, with bootstrap processes and running tablets.
  {
    SCOPED_LOG_TIMING(INFO, Substitute("load tablet metadata"));
    std::atomic<int> total_loaded_count = 0;
    std::atomic<int> success_loaded_count = 0;
    std::atomic<bool> seen_error = false;
    Status first_error;
    for (int i = 0; i < tablet_ids.size(); i++) {
      if (seen_error) {
        // If seen any error, we should abort loading tablet metadata.
        break;
      }

      RETURN_NOT_OK(open_tablet_pool_->Submit([this, i, tablet_ids, &total_loaded_count,
                                               &success_loaded_count, &metas,
                                               &seen_error, &first_error]() {
        const string& tablet_id = tablet_ids[i];
        Status s;
        do {
          KLOG_EVERY_N_SECS(INFO, 1) << Substitute("Loading tablet metadata ($0/$1 complete)",
                                                   total_loaded_count.load(), tablet_ids.size());

          scoped_refptr<TabletMetadata> meta;
          s = OpenTabletMeta(tablet_id, &meta);
          if (!s.ok()) {
            s = s.CloneAndPrepend(Substitute("could not open tablet metadata: $0", tablet_id));
            break;
          }

          total_loaded_count++;

          if (meta->tablet_data_state() != TABLET_DATA_READY) {
            s = HandleNonReadyTabletOnStartup(meta);
            if (!s.ok()) {
              s = s.CloneAndPrepend(Substitute("could not handle non-ready tablet: $0", tablet_id));
            }
            break;
          }

          success_loaded_count++;
          metas[i] = meta;
        } while (false);

        if (!s.ok()) {
          bool current_seen_error = false;
          if (seen_error.compare_exchange_strong(current_seen_error, true)) {
            first_error = s;
          }
        }
      }));
    }
    open_tablet_pool_->Wait();
    if (seen_error) {
      LOG_AND_RETURN(ERROR, first_error);
    }

    LOG(INFO) << Substitute("Loaded tablet metadata ($0 total tablets, $1 live tablets)",
                            total_loaded_count.load(), success_loaded_count.load());
    *tablets_total = success_loaded_count.load();
  }

  // Now submit the "Open" task for each.
  METRIC_tablets_num_total_startup.Instantiate(server_->metric_entity(), *tablets_total);
  *tablets_processed = 0;
  int registered_count = 0;
  if (PREDICT_TRUE(!FLAGS_tablet_bootstrap_skip_opening_tablet_for_testing)) {
    SCOPED_LOG_TIMING(INFO, Substitute("register tablets"));
    for (const auto& meta : metas) {
      if (!meta.get()) {
        continue;
      }
      KLOG_EVERY_N_SECS(INFO, 1) << Substitute("Registering tablets ($0/$1 complete)",
                                               registered_count, metas.size());
      scoped_refptr<TransitionInProgressDeleter> deleter;
      {
        std::lock_guard lock(lock_);
        CHECK_OK(StartTabletStateTransitionUnlocked(meta->tablet_id(), "opening tablet", &deleter));
      }

      scoped_refptr<TabletReplica> replica;
      RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &replica));
      RETURN_NOT_OK(open_tablet_pool_->Submit(
          [this, replica, deleter, tablets_processed, tablets_total, start_tablets]() {
            this->OpenTablet(replica, deleter, tablets_processed, tablets_total, start_tablets);
          }));
      registered_count++;
    }
    LOG(INFO) << Substitute("Registered $0 tablets", registered_count);
  }

  if (registered_count == 0) {
    start_tablets->Stop();
  }

  {
    std::lock_guard lock(lock_);
    state_ = MANAGER_RUNNING;
  }

  return Status::OK();
}