Status RebalancerTool::Run()

in src/kudu/tools/rebalancer_tool.cc [168:351]


Status RebalancerTool::Run(RunStatus* result_status, size_t* moves_count) {
  DCHECK(result_status);
  *result_status = RunStatus::UNKNOWN;

  optional<MonoTime> deadline;
  if (config_.max_run_time_sec != 0) {
    deadline = MonoTime::Now() + MonoDelta::FromSeconds(config_.max_run_time_sec);
  }

  ClusterRawInfo raw_info;
  {
    shared_lock guard(ksck_lock_);
    RETURN_NOT_OK(KsckResultsToClusterRawInfo(
        nullopt, ksck_->results(), &raw_info));
  }

  ClusterInfo ci;
  RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));

  const auto& ts_id_by_location = ci.locality.servers_by_location;
  if (ts_id_by_location.empty()) {
    // Empty cluster: no tablet servers reported.
    if (moves_count != nullptr) {
      *moves_count = 0;
    }
    *result_status = RunStatus::CLUSTER_IS_BALANCED;
    LOG(INFO) << "no tablet servers are reported: nothing to balance";
    return Status::OK();
  }

  size_t moves_count_total = 0;
  if (config_.move_replicas_from_ignored_tservers) {
    // Move replicas from healthy ignored tservers to other healthy tservers.
    LOG(INFO) << "replacing replicas on healthy ignored tservers";
    IgnoredTserversRunner runner(
        this, config_.ignored_tservers, config_.max_moves_per_server, deadline);
    RETURN_NOT_OK(runner.Init(config_.master_addresses));
    RETURN_NOT_OK(RunWith(&runner, result_status));
    moves_count_total += runner.moves_count();
  }
  if (ts_id_by_location.size() == 1) {
    const auto& location = ts_id_by_location.cbegin()->first;
    const auto& table_filters = config_.table_filters;
    const auto& msg = table_filters.empty()
        ? "running whole-cluster rebalancing"
        : Substitute("running rebalancing for tables: $0",
                     JoinStrings(table_filters, ","));
    LOG(INFO) << msg;
    IntraLocationRunner runner(
        this, config_.ignored_tservers, config_.max_moves_per_server, deadline, location);
    RETURN_NOT_OK(runner.Init(config_.master_addresses));
    RETURN_NOT_OK(RunWith(&runner, result_status));
    moves_count_total += runner.moves_count();
  } else {
    // The essence of location-aware balancing:
    //   1. Find tablets whose replicas placed in such a way that their
    //      distribution violates the main constraint of the placement policy.
    //      For each non-conforming tablet, move its replicas to restore
    //      the placement policy restrictions. In other words, if a location has
    //      more than the majority of replicas for some tablet,
    //      move the replicas of the tablet to other locations.
    //   2. For every tablet whose replica placement does not violate the
    //      placement policy constraints, balance the load among locations.
    //   3. Balance replica distribution within every location. This is a.k.a.
    //      intra-location balancing. The intra-location balancing involves
    //      moving replicas only within location, no replicas are moved between
    //      locations.
    if (config_.run_policy_fixer) {
      // Fix placement policy violations, if any.
      LOG(INFO) << "fixing placement policy violations";
      PolicyFixer runner(
          this, config_.ignored_tservers, config_.max_moves_per_server, deadline);
      RETURN_NOT_OK(runner.Init(config_.master_addresses));
      RETURN_NOT_OK(RunWith(&runner, result_status));
      moves_count_total += runner.moves_count();
    }
    if (config_.run_cross_location_rebalancing) {
      // Run the rebalancing across locations (inter-location rebalancing).
      LOG(INFO) << "running cross-location rebalancing";
      CrossLocationRunner runner(this,
                                 config_.ignored_tservers,
                                 config_.max_moves_per_server,
                                 config_.load_imbalance_threshold,
                                 deadline);
      RETURN_NOT_OK(runner.Init(config_.master_addresses));
      RETURN_NOT_OK(RunWith(&runner, result_status));
      moves_count_total += runner.moves_count();
    }
    if (config_.run_intra_location_rebalancing && !ts_id_by_location.empty()) {
      const size_t locations_num = ts_id_by_location.size();
      DCHECK_GT(locations_num, 0);

      vector<RunStatus> location_run_status(locations_num, RunStatus::UNKNOWN);
      vector<Status> location_status(locations_num, Status::OK());
      vector<size_t> location_moves_count(locations_num, 0);
      vector<string> location_by_idx(locations_num);

      // Thread pool to run intra-location rebalancing tasks in parallel. Since
      // the location assignment provides non-intersecting sets of servers, it's
      // possible to independently move replicas within different locations.
      // The pool is automatically shutdown in its destructor.
      unique_ptr<ThreadPool> rebalance_pool;
      RETURN_NOT_OK(ThreadPoolBuilder("intra-location-rebalancing")
                    .set_trace_metric_prefix("rebalancer")
                    .set_max_threads(
                        config_.intra_location_rebalancing_concurrency == 0
                            ? base::NumCPUs()
                            : config_.intra_location_rebalancing_concurrency)
                    .Build(&rebalance_pool));

      // Run the rebalancing within every location (intra-location rebalancing).
      size_t location_idx = 0;
      for (const auto& elem : ts_id_by_location) {
        auto location = elem.first;
        location_by_idx[location_idx] = location;
        LOG(INFO) << Substitute(
            "starting rebalancing within location '$0'", location);
        RETURN_NOT_OK(rebalance_pool->Submit(
            [this, deadline, location = std::move(location),
             &config = std::as_const(config_),
             &location_status = location_status[location_idx],
             &location_moves_count = location_moves_count[location_idx],
             &location_run_status = location_run_status[location_idx]]() mutable {
          IntraLocationRunner runner(this,
                                     config.ignored_tservers,
                                     config.max_moves_per_server,
                                     deadline,
                                     std::move(location));
          if (const auto& s = runner.Init(config.master_addresses); !s.ok()) {
            location_status = s;
            return;
          }
          if (const auto& s = RunWith(&runner, &location_run_status); !s.ok()) {
            location_status = s;
            return;
          }
          location_moves_count = runner.moves_count();
        }));
        ++location_idx;
      }
      // Wait for the completion of the rebalancing process in every location.
      rebalance_pool->Wait();

      size_t location_balancing_moves = 0;
      Status status;
      RunStatus result_run_status = RunStatus::UNKNOWN;
      for (size_t location_idx = 0; location_idx < locations_num; ++location_idx) {
        // This 'for' cycle scope contains logic to compose the overall status
        // of the intra-location rebalancing based on the statuses of
        // the individual per-location rebalancing tasks.
        const auto& s = location_status[location_idx];
        if (s.ok()) {
          const auto rs = location_run_status[location_idx];
          DCHECK(rs != RunStatus::UNKNOWN);
          if (result_run_status == RunStatus::UNKNOWN ||
              result_run_status == RunStatus::CLUSTER_IS_BALANCED) {
            result_run_status = rs;
          }
          location_balancing_moves += location_moves_count[location_idx];
        } else {
          auto s_with_location_info = s.CloneAndPrepend(Substitute(
              "location $0", location_by_idx[location_idx]));
          if (status.ok()) {
            // Update the overall status to be first seen non-OK status.
            status = s_with_location_info;
          } else {
            // Update the overall status to add info on next non-OK status;
            status = status.CloneAndAppend(s_with_location_info.message());
          }
        }
      }
      // Check for the status and bail out if there was an error.
      RETURN_NOT_OK(status);

      moves_count_total += location_balancing_moves;
      *result_status = result_run_status;
    }
  }
  if (moves_count != nullptr) {
    *moves_count = moves_count_total;
  }

  return Status::OK();
}