src/kudu/tools/rebalancer.h (117 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <iosfwd>
#include <map>
#include <memory>
#include <random>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include <boost/optional/optional.hpp>
#include <gtest/gtest_prod.h>
#include "kudu/client/shared_ptr.h"
#include "kudu/tools/rebalance_algo.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
namespace client {
class KuduClient;
}
namespace tools {
class Ksck;
struct KsckResults;
// A class implementing logic for Kudu cluster rebalancing.
class Rebalancer {
public:
// Configuration parameters for the rebalancer aggregated into a struct.
struct Config {
Config(std::vector<std::string> master_addresses = {},
std::vector<std::string> table_filters = {},
size_t max_moves_per_server = 5,
size_t max_staleness_interval_sec = 300,
int64_t max_run_time_sec = 0,
bool move_rf1_replicas = false,
bool output_replica_distribution_details = false);
// Kudu masters' RPC endpoints.
std::vector<std::string> master_addresses;
// Names of tables to balance. If empty, every table and the whole cluster
// will be balanced.
std::vector<std::string> table_filters;
// Maximum number of move operations to run concurrently on one server.
// An 'operation on a server' means a move operation where either source or
// destination replica is located on the specified server.
size_t max_moves_per_server;
// Maximum duration of the 'staleness' interval, when the rebalancer cannot
// make any progress in scheduling new moves and no prior scheduled moves
// are left, even if re-synchronizing against the cluster's state again and
// again. Such a staleness usually happens in case of a persistent problem
// with the cluster or when some unexpected concurrent activity is present
// (such as automatic recovery of failed replicas, etc.).
size_t max_staleness_interval_sec;
// Maximum run time, in seconds.
int64_t max_run_time_sec;
// Whether to move replicas of tablets with replication factor of one.
bool move_rf1_replicas;
// Whether Rebalancer::PrintStats() should output per-table and per-server
// replica distribution details.
bool output_replica_distribution_details;
};
// Represents a concrete move of a replica from one tablet server to another.
// Formed logically from a TableReplicaMove by specifying a tablet for the move.
struct ReplicaMove {
std::string tablet_uuid;
std::string ts_uuid_from;
std::string ts_uuid_to;
};
enum class RunStatus {
UNKNOWN,
CLUSTER_IS_BALANCED,
TIMED_OUT,
};
// A helper type: key is tablet UUID which corresponds to value.tablet_uuid.
typedef std::unordered_map<std::string, ReplicaMove> MovesInProgress;
// Create Rebalancer object with the specified configuration.
explicit Rebalancer(const Config& config);
// Print the stats on the cluster balance information into the 'out' stream.
Status PrintStats(std::ostream& out);
// Run the rebalancing: start the process and return once the balancing
// criteria are satisfied or if an error occurs. The number of attempted
// moves is output into the 'moves_count' parameter (if the parameter is
// not null). The 'result_status' output parameter cannot be null.
Status Run(RunStatus* result_status, size_t* moves_count = nullptr);
private:
// Helper class to find and schedule next available rebalancing move operation
// and track already scheduled ones.
class Runner {
public:
// The 'max_moves_per_server' specifies the maximum number of operations
// per tablet server (both the source and the destination are counted in).
// The 'deadline' specifies the deadline for the run, 'boost::none'
// if no timeout is set.
Runner(size_t max_moves_per_server,
const boost::optional<MonoTime>& deadline);
// Initialize instance of Runner so it can run against Kudu cluster with
// the 'master_addresses' RPC endpoints.
Status Init(std::vector<std::string> master_addresses);
// Load information on prescribed replica movement operations. Also,
// populate helper containers and other auxiliary run-time structures
// used by ScheduleNextMove(). This method is called with every batch
// of move operations output by the rebalancing algorithm once previously
// loaded moves have been scheduled.
void LoadMoves(std::vector<ReplicaMove> replica_moves);
// Schedule next replica move.
bool ScheduleNextMove(bool* has_errors, bool* timed_out);
// Update statuses and auxiliary information on in-progress replica move
// operations. The 'timed_out' parameter is set to 'true' if not all
// in-progress operations were processed by the deadline specified by
// the 'deadline_' member field. The method returns 'true' if it's necessary
// to clear the state of the in-progress operations, i.e. 'forget'
// those, starting from a clean state.
bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out);
uint32_t moves_count() const {
return moves_count_;
}
const MovesInProgress& scheduled_moves() const {
return scheduled_moves_;
}
private:
// Given the data in the helper containers, find the index describing
// the next replica move and output it into the 'op_idx' parameter.
bool FindNextMove(size_t* op_idx);
// Update the helper containers once a move operation has been scheduled.
void UpdateOnMoveScheduled(size_t idx,
const std::string& tablet_uuid,
const std::string& src_ts_uuid,
const std::string& dst_ts_uuid,
bool is_success);
// Auxiliary method used by UpdateOnMoveScheduled() implementation.
void UpdateOnMoveScheduledImpl(
size_t idx,
const std::string& ts_uuid,
bool is_success,
std::unordered_map<std::string, std::set<size_t>>* op_indices);
// Update the helper containers once a scheduled operation is complete
// (i.e. succeeded or failed).
void UpdateOnMoveCompleted(const std::string& ts_uuid);
// Maximum allowed number of move operations per server. For a move
// operation, a source replica adds +1 at the source server and the target
// replica adds +1 at the destination server.
const size_t max_moves_per_server_;
// Deadline for the activity performed by the Runner class in
// ScheduleNextMoves() and UpadteMovesInProgressStatus() methods.
const boost::optional<MonoTime> deadline_;
// Number of successfully completed replica moves operations.
uint32_t moves_count_;
// Kudu cluster RPC end-points.
std::vector<std::string> master_addresses_;
// The moves to schedule.
std::vector<ReplicaMove> replica_moves_;
// Mapping 'tserver UUID' --> 'indices of move operations having the
// tserver UUID (i.e. the key) as the source of the move operation'.
std::unordered_map<std::string, std::set<size_t>> src_op_indices_;
// Mapping 'tserver UUID' --> 'indices of move operations having the
// tserver UUID (i.e. the key) as the destination of the move operation'.
std::unordered_map<std::string, std::set<size_t>> dst_op_indices_;
// Mapping 'tserver UUID' --> 'scheduled move operations count'.
std::unordered_map<std::string, int32_t> op_count_per_ts_;
// Mapping 'scheduled move operations count' --> 'tserver UUID'. That's
// just reversed 'op_count_per_ts_'. Having count as key helps with finding
// servers with minimum number of scheduled operations while scheduling
// replica movement operations (it's necessary to preserve the
// 'maximum-moves-per-server' constraint while doing so).
std::multimap<int32_t, std::string> ts_per_op_count_;
// Information on scheduled replica movement operations; keys are
// tablet UUIDs, values are ReplicaMove structures.
MovesInProgress scheduled_moves_;
// Client object to make queries to Kudu masters for various auxiliary info
// while scheduling move operations and monitoring their status.
client::sp::shared_ptr<client::KuduClient> client_;
};
friend class KsckResultsToClusterBalanceInfoTest;
// Convert ksck results into cluster balance information suitable for the
// input of the high-level rebalancing algorithm. The 'moves_in_progress'
// parameter contains information on the replica moves which have been
// scheduled by a caller and still in progress: those are considered
// as successfully completed and applied to the 'ksck_info' when building
// ClusterBalanceInfo for the specified 'ksck_info' input. The result
// cluster balance information is output into the 'cbi' parameter. The 'cbi'
// output parameter cannot be null.
Status KsckResultsToClusterBalanceInfo(
const KsckResults& ksck_info,
const MovesInProgress& moves_in_progress,
ClusterBalanceInfo* cbi) const;
// Get next batch of replica moves from the rebalancing algorithm.
// Essentially, it runs ksck against the cluster and feeds the data into the
// rebalancing algorithm along with the information on currently pending
// replica movement operations. The information returned by the high-level
// rebalancing algorithm is translated into particular replica movement
// instructions, which are used to populate the 'replica_moves' parameter
// (the container is cleared first).
//
// The 'moves_in_progress' parameter contains information on pending moves.
// The results are output into 'replica_moves', which will be empty
// if no next steps are needed to make the cluster balanced.
Status GetNextMoves(const MovesInProgress& moves_in_progress,
std::vector<ReplicaMove>* replica_moves);
// Given information from the high-level rebalancing algorithm, find
// appropriate tablet replicas to move on the specified tablet servers.
// The set of result UUIDs is output into the 'tablet_ids' container (note:
// the output container is first cleared). If no suitable replicas are found,
// 'tablet_ids' will be empty with the result status of Status::OK().
Status FindReplicas(const TableReplicaMove& move,
const KsckResults& ksck_info,
std::vector<std::string>* tablet_ids) const;
// Reset ksck-related fields, preparing for a fresh ksck run.
Status ResetKsck();
// Filter out move operations at the tablets which already have operations
// in progress. The 'replica_moves' cannot be null.
void FilterMoves(const MovesInProgress& scheduled_moves,
std::vector<ReplicaMove>* replica_moves);
// Configuration for the rebalancer.
const Config config_;
// Random device and generator for selecting among multiple choices, when
// appropriate.
std::random_device random_device_;
std::mt19937 random_generator_;
// An instance of the balancing algorithm.
TwoDimensionalGreedyAlgo algo_;
// Auxiliary Ksck object to get information on the cluster.
std::shared_ptr<Ksck> ksck_;
};
} // namespace tools
} // namespace kudu