in src/kudu/tools/rebalancer_tool.cc [168:351]
Status RebalancerTool::Run(RunStatus* result_status, size_t* moves_count) {
DCHECK(result_status);
*result_status = RunStatus::UNKNOWN;
optional<MonoTime> deadline;
if (config_.max_run_time_sec != 0) {
deadline = MonoTime::Now() + MonoDelta::FromSeconds(config_.max_run_time_sec);
}
ClusterRawInfo raw_info;
{
shared_lock guard(ksck_lock_);
RETURN_NOT_OK(KsckResultsToClusterRawInfo(
nullopt, ksck_->results(), &raw_info));
}
ClusterInfo ci;
RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
const auto& ts_id_by_location = ci.locality.servers_by_location;
if (ts_id_by_location.empty()) {
// Empty cluster: no tablet servers reported.
if (moves_count != nullptr) {
*moves_count = 0;
}
*result_status = RunStatus::CLUSTER_IS_BALANCED;
LOG(INFO) << "no tablet servers are reported: nothing to balance";
return Status::OK();
}
size_t moves_count_total = 0;
if (config_.move_replicas_from_ignored_tservers) {
// Move replicas from healthy ignored tservers to other healthy tservers.
LOG(INFO) << "replacing replicas on healthy ignored tservers";
IgnoredTserversRunner runner(
this, config_.ignored_tservers, config_.max_moves_per_server, deadline);
RETURN_NOT_OK(runner.Init(config_.master_addresses));
RETURN_NOT_OK(RunWith(&runner, result_status));
moves_count_total += runner.moves_count();
}
if (ts_id_by_location.size() == 1) {
const auto& location = ts_id_by_location.cbegin()->first;
const auto& table_filters = config_.table_filters;
const auto& msg = table_filters.empty()
? "running whole-cluster rebalancing"
: Substitute("running rebalancing for tables: $0",
JoinStrings(table_filters, ","));
LOG(INFO) << msg;
IntraLocationRunner runner(
this, config_.ignored_tservers, config_.max_moves_per_server, deadline, location);
RETURN_NOT_OK(runner.Init(config_.master_addresses));
RETURN_NOT_OK(RunWith(&runner, result_status));
moves_count_total += runner.moves_count();
} else {
// The essence of location-aware balancing:
// 1. Find tablets whose replicas placed in such a way that their
// distribution violates the main constraint of the placement policy.
// For each non-conforming tablet, move its replicas to restore
// the placement policy restrictions. In other words, if a location has
// more than the majority of replicas for some tablet,
// move the replicas of the tablet to other locations.
// 2. For every tablet whose replica placement does not violate the
// placement policy constraints, balance the load among locations.
// 3. Balance replica distribution within every location. This is a.k.a.
// intra-location balancing. The intra-location balancing involves
// moving replicas only within location, no replicas are moved between
// locations.
if (config_.run_policy_fixer) {
// Fix placement policy violations, if any.
LOG(INFO) << "fixing placement policy violations";
PolicyFixer runner(
this, config_.ignored_tservers, config_.max_moves_per_server, deadline);
RETURN_NOT_OK(runner.Init(config_.master_addresses));
RETURN_NOT_OK(RunWith(&runner, result_status));
moves_count_total += runner.moves_count();
}
if (config_.run_cross_location_rebalancing) {
// Run the rebalancing across locations (inter-location rebalancing).
LOG(INFO) << "running cross-location rebalancing";
CrossLocationRunner runner(this,
config_.ignored_tservers,
config_.max_moves_per_server,
config_.load_imbalance_threshold,
deadline);
RETURN_NOT_OK(runner.Init(config_.master_addresses));
RETURN_NOT_OK(RunWith(&runner, result_status));
moves_count_total += runner.moves_count();
}
if (config_.run_intra_location_rebalancing && !ts_id_by_location.empty()) {
const size_t locations_num = ts_id_by_location.size();
DCHECK_GT(locations_num, 0);
vector<RunStatus> location_run_status(locations_num, RunStatus::UNKNOWN);
vector<Status> location_status(locations_num, Status::OK());
vector<size_t> location_moves_count(locations_num, 0);
vector<string> location_by_idx(locations_num);
// Thread pool to run intra-location rebalancing tasks in parallel. Since
// the location assignment provides non-intersecting sets of servers, it's
// possible to independently move replicas within different locations.
// The pool is automatically shutdown in its destructor.
unique_ptr<ThreadPool> rebalance_pool;
RETURN_NOT_OK(ThreadPoolBuilder("intra-location-rebalancing")
.set_trace_metric_prefix("rebalancer")
.set_max_threads(
config_.intra_location_rebalancing_concurrency == 0
? base::NumCPUs()
: config_.intra_location_rebalancing_concurrency)
.Build(&rebalance_pool));
// Run the rebalancing within every location (intra-location rebalancing).
size_t location_idx = 0;
for (const auto& elem : ts_id_by_location) {
auto location = elem.first;
location_by_idx[location_idx] = location;
LOG(INFO) << Substitute(
"starting rebalancing within location '$0'", location);
RETURN_NOT_OK(rebalance_pool->Submit(
[this, deadline, location = std::move(location),
&config = std::as_const(config_),
&location_status = location_status[location_idx],
&location_moves_count = location_moves_count[location_idx],
&location_run_status = location_run_status[location_idx]]() mutable {
IntraLocationRunner runner(this,
config.ignored_tservers,
config.max_moves_per_server,
deadline,
std::move(location));
if (const auto& s = runner.Init(config.master_addresses); !s.ok()) {
location_status = s;
return;
}
if (const auto& s = RunWith(&runner, &location_run_status); !s.ok()) {
location_status = s;
return;
}
location_moves_count = runner.moves_count();
}));
++location_idx;
}
// Wait for the completion of the rebalancing process in every location.
rebalance_pool->Wait();
size_t location_balancing_moves = 0;
Status status;
RunStatus result_run_status = RunStatus::UNKNOWN;
for (size_t location_idx = 0; location_idx < locations_num; ++location_idx) {
// This 'for' cycle scope contains logic to compose the overall status
// of the intra-location rebalancing based on the statuses of
// the individual per-location rebalancing tasks.
const auto& s = location_status[location_idx];
if (s.ok()) {
const auto rs = location_run_status[location_idx];
DCHECK(rs != RunStatus::UNKNOWN);
if (result_run_status == RunStatus::UNKNOWN ||
result_run_status == RunStatus::CLUSTER_IS_BALANCED) {
result_run_status = rs;
}
location_balancing_moves += location_moves_count[location_idx];
} else {
auto s_with_location_info = s.CloneAndPrepend(Substitute(
"location $0", location_by_idx[location_idx]));
if (status.ok()) {
// Update the overall status to be first seen non-OK status.
status = s_with_location_info;
} else {
// Update the overall status to add info on next non-OK status;
status = status.CloneAndAppend(s_with_location_info.message());
}
}
}
// Check for the status and bail out if there was an error.
RETURN_NOT_OK(status);
moves_count_total += location_balancing_moves;
*result_status = result_run_status;
}
}
if (moves_count != nullptr) {
*moves_count = moves_count_total;
}
return Status::OK();
}