in src/kudu/tserver/ts_tablet_manager.cc [446:623]
Status TSTabletManager::Init(Timer* start_tablets,
std::atomic<int>* tablets_processed,
std::atomic<int>* tablets_total) {
CHECK_EQ(state(), MANAGER_INITIALIZING);
// Start the tablet copy thread pool. We set a max queue size of 0 so that if
// the number of requests exceeds the number of threads, a
// SERVICE_UNAVAILABLE error may be returned to the remote caller.
RETURN_NOT_OK(ThreadPoolBuilder("tablet-copy")
.set_max_queue_size(0)
.set_max_threads(FLAGS_num_tablets_to_copy_simultaneously)
.Build(&tablet_copy_pool_));
RETURN_NOT_OK(ThreadPoolBuilder("txn-commit")
.set_max_threads(FLAGS_txn_commit_pool_num_threads)
.Build(&txn_commit_pool_));
// Start the threadpools we'll use to open and delete tablets.
// This has to be done in Init() instead of the constructor, since the
// FsManager isn't initialized until this point.
int max_open_threads = FLAGS_num_tablets_to_open_simultaneously;
if (max_open_threads == 0) {
// Default to the number of disks.
max_open_threads = fs_manager_->GetDataRootDirs().size();
}
RETURN_NOT_OK(ThreadPoolBuilder("tablet-open")
.set_max_threads(max_open_threads)
.Build(&open_tablet_pool_));
int max_delete_threads = FLAGS_num_tablets_to_delete_simultaneously;
if (max_delete_threads == 0) {
// Default to the number of disks.
max_delete_threads = fs_manager_->GetDataRootDirs().size();
}
RETURN_NOT_OK(ThreadPoolBuilder("tablet-delete")
.set_max_threads(max_delete_threads)
.Build(&delete_tablet_pool_));
int max_reload_threads = FLAGS_num_txn_status_tablets_to_reload_simultaneously;
if (max_reload_threads == 0) {
// Default to the number of data directories.
max_reload_threads = fs_manager_->GetDataRootDirs().size();
}
RETURN_NOT_OK(ThreadPoolBuilder("txn-status-tablet-reload")
.set_max_threads(max_reload_threads)
.Build(&reload_txn_status_tablet_pool_));
RETURN_NOT_OK(
ThreadPoolBuilder("txn-participant-registration")
.set_max_threads(FLAGS_txn_participant_registration_pool_num_threads)
.Build(&txn_participant_registration_pool_));
// TODO(aserbin): if better parallelism is needed to serve higher txn volume,
// consider using multiple threads in this pool and schedule
// per-tablet-replica clean-up tasks via threadpool serial
// tokens to make sure no more than one clean-up task
// is running against a txn status tablet replica.
RETURN_NOT_OK(ThreadPoolBuilder("txn-status-manager")
.set_max_threads(1)
.set_max_queue_size(0)
.Build(&txn_status_manager_pool_));
RETURN_NOT_OK(txn_status_manager_pool_->Submit([this]() {
this->TxnStalenessTrackerTask();
}));
start_tablets->Start();
// Search for tablets in the metadata dir.
vector<string> tablet_ids;
RETURN_NOT_OK(fs_manager_->ListTabletIds(&tablet_ids));
InitLocalRaftPeerPB();
vector<scoped_refptr<TabletMetadata>> metas(tablet_ids.size());
// First, load all of the tablet metadata. We do this before we start
// submitting the actual OpenTablet() tasks so that we don't have to compete
// for disk resources, etc, with bootstrap processes and running tablets.
{
SCOPED_LOG_TIMING(INFO, Substitute("load tablet metadata"));
std::atomic<int> total_loaded_count = 0;
std::atomic<int> success_loaded_count = 0;
std::atomic<bool> seen_error = false;
Status first_error;
for (int i = 0; i < tablet_ids.size(); i++) {
if (seen_error) {
// If seen any error, we should abort loading tablet metadata.
break;
}
RETURN_NOT_OK(open_tablet_pool_->Submit([this, i, tablet_ids, &total_loaded_count,
&success_loaded_count, &metas,
&seen_error, &first_error]() {
const string& tablet_id = tablet_ids[i];
Status s;
do {
KLOG_EVERY_N_SECS(INFO, 1) << Substitute("Loading tablet metadata ($0/$1 complete)",
total_loaded_count.load(), tablet_ids.size());
scoped_refptr<TabletMetadata> meta;
s = OpenTabletMeta(tablet_id, &meta);
if (!s.ok()) {
s = s.CloneAndPrepend(Substitute("could not open tablet metadata: $0", tablet_id));
break;
}
total_loaded_count++;
if (meta->tablet_data_state() != TABLET_DATA_READY) {
s = HandleNonReadyTabletOnStartup(meta);
if (!s.ok()) {
s = s.CloneAndPrepend(Substitute("could not handle non-ready tablet: $0", tablet_id));
}
break;
}
success_loaded_count++;
metas[i] = meta;
} while (false);
if (!s.ok()) {
bool current_seen_error = false;
if (seen_error.compare_exchange_strong(current_seen_error, true)) {
first_error = s;
}
}
}));
}
open_tablet_pool_->Wait();
if (seen_error) {
LOG_AND_RETURN(ERROR, first_error);
}
LOG(INFO) << Substitute("Loaded tablet metadata ($0 total tablets, $1 live tablets)",
total_loaded_count.load(), success_loaded_count.load());
*tablets_total = success_loaded_count.load();
}
// Now submit the "Open" task for each.
METRIC_tablets_num_total_startup.Instantiate(server_->metric_entity(), *tablets_total);
*tablets_processed = 0;
int registered_count = 0;
if (PREDICT_TRUE(!FLAGS_tablet_bootstrap_skip_opening_tablet_for_testing)) {
SCOPED_LOG_TIMING(INFO, Substitute("register tablets"));
for (const auto& meta : metas) {
if (!meta.get()) {
continue;
}
KLOG_EVERY_N_SECS(INFO, 1) << Substitute("Registering tablets ($0/$1 complete)",
registered_count, metas.size());
scoped_refptr<TransitionInProgressDeleter> deleter;
{
std::lock_guard lock(lock_);
CHECK_OK(StartTabletStateTransitionUnlocked(meta->tablet_id(), "opening tablet", &deleter));
}
scoped_refptr<TabletReplica> replica;
RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &replica));
RETURN_NOT_OK(open_tablet_pool_->Submit(
[this, replica, deleter, tablets_processed, tablets_total, start_tablets]() {
this->OpenTablet(replica, deleter, tablets_processed, tablets_total, start_tablets);
}));
registered_count++;
}
LOG(INFO) << Substitute("Registered $0 tablets", registered_count);
}
if (registered_count == 0) {
start_tablets->Stop();
}
{
std::lock_guard lock(lock_);
state_ = MANAGER_RUNNING;
}
return Status::OK();
}