src/kudu/tools/kudu-admin-test.cc (2,019 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include <algorithm> #include <atomic> #include <cstdint> #include <cstdio> #include <cstdlib> #include <deque> #include <iterator> #include <memory> #include <ostream> #include <string> #include <thread> #include <unordered_map> #include <unordered_set> #include <utility> #include <vector> #include <gflags/gflags_declare.h> #include <glog/logging.h> #include <gtest/gtest.h> #include "kudu/client/client-test-util.h" #include "kudu/client/client.h" #include "kudu/client/schema.h" #include "kudu/client/shared_ptr.h" #include "kudu/common/common.pb.h" #include "kudu/common/wire_protocol.pb.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/consensus.proxy.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/consensus/opid.pb.h" #include "kudu/consensus/quorum_util.h" #include "kudu/gutil/basictypes.h" #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/cluster_itest_util.h" #include "kudu/integration-tests/cluster_verifier.h" #include "kudu/integration-tests/test_workload.h" #include "kudu/integration-tests/ts_itest-base.h" #include "kudu/master/master.pb.h" #include "kudu/mini-cluster/external_mini_cluster.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/tablet/metadata.pb.h" #include "kudu/tablet/tablet.pb.h" #include "kudu/tools/tool_test_util.h" #include "kudu/tserver/tablet_server-test-base.h" #include "kudu/tserver/tserver.pb.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/pb_util.h" #include "kudu/util/random.h" #include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" DECLARE_int32(num_replicas); DECLARE_int32(num_tablet_servers); using kudu::client::KuduClient; using kudu::client::KuduClientBuilder; using kudu::client::KuduColumnSchema; using kudu::client::KuduSchema; using kudu::client::KuduSchemaBuilder; using kudu::client::KuduTableAlterer; using kudu::client::KuduTableCreator; using kudu::client::sp::shared_ptr; using kudu::cluster::ExternalTabletServer; using kudu::consensus::COMMITTED_OPID; using kudu::consensus::ConsensusStatePB; using kudu::consensus::EXCLUDE_HEALTH_REPORT; using kudu::consensus::LeaderStepDownMode; using kudu::consensus::OpId; using kudu::itest::FindTabletFollowers; using kudu::itest::FindTabletLeader; using kudu::itest::GetConsensusState; using kudu::itest::StartElection; using kudu::itest::WaitUntilLeader; using kudu::itest::TabletServerMap; using kudu::itest::TServerDetails; using kudu::itest::WAIT_FOR_LEADER; using kudu::itest::WaitForReplicasReportedToMaster; using kudu::itest::WaitForServersToAgree; using kudu::itest::WaitUntilCommittedConfigNumVotersIs; using kudu::itest::WaitUntilCommittedOpIdIndexIs; using kudu::itest::WaitUntilTabletInState; using kudu::itest::WaitUntilTabletRunning; using kudu::master::VOTER_REPLICA; using kudu::pb_util::SecureDebugString; using kudu::tserver::ListTabletsResponsePB; using std::atomic; using std::back_inserter; using std::copy; using std::deque; using std::endl; using std::ostringstream; using std::string; using std::thread; using std::tuple; using std::unique_ptr; using std::vector; using strings::Split; using strings::Substitute; namespace kudu { class Schema; namespace tools { // Helper to format info when a tool action fails. static string ToolRunInfo(const Status& s, const string& out, const string& err) { ostringstream str; str << s.ToString() << endl; str << "stdout: " << out << endl; str << "stderr: " << err << endl; return str.str(); } // Helper macro for tool tests. Use as follows: // // ASSERT_TOOL_OK("cluster", "ksck", master_addrs); // // The failure Status result of RunKuduTool is usually useless, so this macro // also logs the stdout and stderr in case of failure, for easier diagnosis. // TODO(wdberkeley): Add a macro to retrieve stdout or stderr, or a macro for // when one of those should match a string. #define ASSERT_TOOL_OK(...) do { \ const vector<string>& _args{__VA_ARGS__}; \ string _out, _err; \ const Status& _s = RunKuduTool(_args, &_out, &_err); \ if (_s.ok()) { \ SUCCEED(); \ } else { \ FAIL() << ToolRunInfo(_s, _out, _err); \ } \ } while (0); class AdminCliTest : public tserver::TabletServerIntegrationTestBase { }; // Test config change while running a workload. // 1. Instantiate external mini cluster with 3 TS. // 2. Create table with 2 replicas. // 3. Invoke CLI to trigger a config change. // 4. Wait until the new server bootstraps. // 5. Profit! TEST_F(AdminCliTest, TestChangeConfig) { const vector<string> kMasterFlags = { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", "--allow_unsafe_replication_factor=true", // If running with the 3-4-3 replication scheme, the catalog manager removes // excess replicas, so it's necessary to disable that default behavior // since this test manages replicas on its own. "--catalog_manager_evict_excess_replicas=false", }; const vector<string> kTserverFlags = { "--enable_leader_failure_detection=false", }; FLAGS_num_tablet_servers = 3; FLAGS_num_replicas = 2; NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); vector<TServerDetails*> tservers; AppendValuesFromMap(tablet_servers_, &tservers); ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); TabletServerMap active_tablet_servers; TabletServerMap::const_iterator iter = tablet_replicas_.find(tablet_id_); TServerDetails* leader = iter->second; TServerDetails* follower = (++iter)->second; InsertOrDie(&active_tablet_servers, leader->uuid(), leader); InsertOrDie(&active_tablet_servers, follower->uuid(), follower); TServerDetails* new_node = nullptr; for (TServerDetails* ts : tservers) { if (!ContainsKey(active_tablet_servers, ts->uuid())) { new_node = ts; break; } } ASSERT_NE(nullptr, new_node); // Elect the leader (still only a consensus config size of 2). ASSERT_OK(StartElection(leader, tablet_id_, MonoDelta::FromSeconds(10))); ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, tablet_id_, 1)); TestWorkload workload(cluster_.get()); workload.set_table_name(kTableId); workload.set_timeout_allowed(true); workload.set_write_timeout_millis(10000); workload.set_num_replicas(FLAGS_num_replicas); workload.set_num_write_threads(1); workload.set_write_batch_size(1); workload.Setup(); workload.Start(); // Wait until the Master knows about the leader tserver. TServerDetails* master_observed_leader; ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); LOG(INFO) << "Adding replica at tserver with UUID " << new_node->uuid() << " as VOTER..."; ASSERT_TOOL_OK( "tablet", "change_config", "add_replica", cluster_->master()->bound_rpc_addr().ToString(), tablet_id_, new_node->uuid(), "VOTER" ); InsertOrDie(&active_tablet_servers, new_node->uuid(), new_node); ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), leader, tablet_id_, MonoDelta::FromSeconds(10))); workload.StopAndJoin(); int num_batches = workload.batches_completed(); LOG(INFO) << "Waiting for replicas to agree..."; // Wait for all servers to replicate everything up through the last write op. // Since we don't batch, there should be at least # rows inserted log entries, // plus the initial leader's no-op, plus 1 for // the added replica for a total == #rows + 2. int min_log_index = num_batches + 2; ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, tablet_id_, min_log_index)); int rows_inserted = workload.rows_inserted(); LOG(INFO) << "Number of rows inserted: " << rows_inserted; ClusterVerifier v(cluster_.get()); NO_FATALS(v.CheckCluster()); NO_FATALS(v.CheckRowCount(kTableId, ClusterVerifier::AT_LEAST, rows_inserted)); // Now remove the server. LOG(INFO) << "Removing replica at tserver with UUID " << new_node->uuid() << " from the config..."; ASSERT_TOOL_OK( "tablet", "change_config", "remove_replica", cluster_->master()->bound_rpc_addr().ToString(), tablet_id_, new_node->uuid() ); ASSERT_EQ(1, active_tablet_servers.erase(new_node->uuid())); ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), leader, tablet_id_, MonoDelta::FromSeconds(10))); } enum class Kudu1097 { Disable, Enable, }; enum class DownTS { None, TabletPeer, // Regression case for KUDU-2331. UninvolvedTS, }; class MoveTabletParamTest : public AdminCliTest, public ::testing::WithParamInterface<tuple<Kudu1097, DownTS>> { }; TEST_P(MoveTabletParamTest, Test) { const MonoDelta timeout = MonoDelta::FromSeconds(30); const auto& param = GetParam(); const auto enable_kudu_1097 = std::get<0>(param); const auto downTS = std::get<1>(param); FLAGS_num_tablet_servers = 5; FLAGS_num_replicas = 3; vector<string> ts_flags, master_flags; ts_flags = master_flags = { Substitute("--raft_prepare_replacement_before_eviction=$0", enable_kudu_1097 == Kudu1097::Enable) }; NO_FATALS(BuildAndStart(ts_flags, master_flags)); vector<string> tservers; AppendKeysFromMap(tablet_servers_, &tservers); ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); deque<string> active_tservers; for (auto iter = tablet_replicas_.find(tablet_id_); iter != tablet_replicas_.cend(); ++iter) { active_tservers.push_back(iter->second->uuid()); } ASSERT_EQ(FLAGS_num_replicas, active_tservers.size()); deque<string> inactive_tservers; std::sort(tservers.begin(), tservers.end()); std::sort(active_tservers.begin(), active_tservers.end()); std::set_difference(tservers.cbegin(), tservers.cend(), active_tservers.cbegin(), active_tservers.cend(), std::back_inserter(inactive_tservers)); ASSERT_EQ(FLAGS_num_tablet_servers - FLAGS_num_replicas, inactive_tservers.size()); // The workload is light (1 thread, 1 op batches) so that new replicas // bootstrap and converge quickly. TestWorkload workload(cluster_.get()); workload.set_table_name(kTableId); workload.set_num_replicas(FLAGS_num_replicas); workload.set_num_write_threads(1); workload.set_write_batch_size(1); workload.set_write_timeout_millis(timeout.ToMilliseconds()); workload.Setup(); workload.Start(); if (downTS == DownTS::TabletPeer) { // To test that the move fails if any peer is down, when downTS is // 'TabletPeer', shut down a server besides 'add' that hosts a replica. NO_FATALS(cluster_->tablet_server_by_uuid(active_tservers.back())->Shutdown()); } else if (downTS == DownTS::UninvolvedTS) { // Regression case for KUDU-2331, where move_replica would fail if any tablet // server is down, even if that tablet server was not involved in the move. NO_FATALS(cluster_->tablet_server_by_uuid(inactive_tservers.back())->Shutdown()); } // If we're not bringing down a tablet server, do 3 moves. // Assuming no ad hoc leadership changes, 3 guarantees the leader is moved at least once. int num_moves = AllowSlowTests() && (downTS == DownTS::None) ? 3 : 1; for (int i = 0; i < num_moves; i++) { const string remove = active_tservers.front(); const string add = inactive_tservers.front(); vector<string> tool_command = { "tablet", "change_config", "move_replica", cluster_->master()->bound_rpc_addr().ToString(), tablet_id_, remove, add, }; string stdout, stderr; Status s = RunKuduTool(tool_command, &stdout, &stderr); if (downTS == DownTS::TabletPeer) { ASSERT_TRUE(s.IsRuntimeError()); workload.StopAndJoin(); return; } ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); active_tservers.pop_front(); active_tservers.push_back(add); inactive_tservers.pop_front(); inactive_tservers.push_back(remove); // Allow the added server time to catch up so it applies the newest configuration. // If we don't wait, the initial ksck of move_tablet can fail with consensus conflict. TabletServerMap active_tservers_map; for (const string& uuid : active_tservers) { InsertOrDie(&active_tservers_map, uuid, tablet_servers_[uuid]); } ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(/*num_voters=*/ FLAGS_num_replicas, active_tservers_map[add], tablet_id_, timeout)); NO_FATALS(WaitUntilCommittedConfigNumMembersIs(/*num_members=*/ FLAGS_num_replicas, active_tservers_map[add], tablet_id_, timeout)); } workload.StopAndJoin(); NO_FATALS(cluster_->AssertNoCrashes()); // If a tablet server is down, we need to skip the ClusterVerifier. if (downTS == DownTS::None) { ClusterVerifier v(cluster_.get()); NO_FATALS(v.CheckCluster()); } } INSTANTIATE_TEST_CASE_P(EnableKudu1097AndDownTS, MoveTabletParamTest, ::testing::Combine(::testing::Values(Kudu1097::Disable, Kudu1097::Enable), ::testing::Values(DownTS::None, DownTS::TabletPeer, DownTS::UninvolvedTS))); Status RunUnsafeChangeConfig(const string& tablet_id, const string& dst_host, const vector<string>& peer_uuid_list) { vector<string> command_args = { "remote_replica", "unsafe_change_config", dst_host, tablet_id }; copy(peer_uuid_list.begin(), peer_uuid_list.end(), back_inserter(command_args)); return RunKuduTool(command_args); } // Test unsafe config change when there is one follower survivor in the cluster. // 1. Instantiate external mini cluster with 1 tablet having 3 replicas and 5 TS. // 2. Shut down leader and follower1. // 3. Trigger unsafe config change on follower2 having follower2 in the config. // 4. Wait until the new config is populated on follower2(new leader) and master. // 5. Bring up leader and follower1 and verify replicas are deleted. // 6. Verify that new config doesn't contain old leader and follower1. TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) { const MonoDelta kTimeout = MonoDelta::FromSeconds(30); FLAGS_num_tablet_servers = 5; FLAGS_num_replicas = 3; // tserver_unresponsive_timeout_ms is useful so that master considers // the live tservers for tablet re-replication. NO_FATALS(BuildAndStart()); LOG(INFO) << "Finding tablet leader and waiting for things to start..."; string tablet_id = tablet_replicas_.begin()->first; // Determine the list of tablet servers currently in the config. TabletServerMap active_tablet_servers; auto iter = tablet_replicas_.equal_range(tablet_id); for (auto it = iter.first; it != iter.second; ++it) { InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); } // Get a baseline config reported to the master. LOG(INFO) << "Waiting for Master to see the current replicas..."; master::TabletLocationsPB tablet_locations; bool has_leader; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); // Wait for initial NO_OP to be committed by the leader. TServerDetails* leader_ts; vector<TServerDetails*> followers; ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id, kTimeout, &leader_ts)); ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id, kTimeout, &followers)); OpId opid; ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id, COMMITTED_OPID, kTimeout, &opid)); // Shut down master so it doesn't interfere while we shut down the leader and // one of the other followers. cluster_->master()->Shutdown(); cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown(); cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown(); LOG(INFO) << "Forcing unsafe config change on remaining follower " << followers[0]->uuid(); const string& follower0_addr = cluster_->tablet_server_by_uuid(followers[0]->uuid())->bound_rpc_addr().ToString(); ASSERT_OK(RunUnsafeChangeConfig(tablet_id, follower0_addr, { followers[0]->uuid() })); ASSERT_OK(WaitUntilLeader(followers[0], tablet_id, kTimeout)); ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(1, followers[0], tablet_id, kTimeout)); LOG(INFO) << "Restarting master..."; // Restart master so it can re-replicate the tablet to remaining tablet servers. ASSERT_OK(cluster_->master()->Restart()); // Wait for master to re-replicate. ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, followers[0], tablet_id, kTimeout)); ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); ASSERT_OK(WaitForOpFromCurrentTerm(followers[0], tablet_id, COMMITTED_OPID, kTimeout, &opid)); active_tablet_servers.clear(); std::unordered_set<string> replica_uuids; for (const auto& loc : tablet_locations.replicas()) { const string& uuid = loc.ts_info().permanent_uuid(); InsertOrDie(&active_tablet_servers, uuid, tablet_servers_[uuid]); } ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id, opid.index())); // Verify that two new servers are part of new config and old // servers are gone. for (const master::TabletLocationsPB_ReplicaPB& replica : tablet_locations.replicas()) { ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid()); ASSERT_NE(replica.ts_info().permanent_uuid(), leader_ts->uuid()); } // Also verify that when we bring back followers[1] and leader, // we should see the tablet in TOMBSTONED state on these servers. ASSERT_OK(cluster_->tablet_server_by_uuid(leader_ts->uuid())->Restart()); ASSERT_OK(cluster_->tablet_server_by_uuid(followers[1]->uuid())->Restart()); ASSERT_OK(WaitUntilTabletInState(leader_ts, tablet_id, tablet::STOPPED, kTimeout)); ASSERT_OK(WaitUntilTabletInState(followers[1], tablet_id, tablet::STOPPED, kTimeout)); } // Test unsafe config change when there is one leader survivor in the cluster. // 1. Instantiate external mini cluster with 1 tablet having 3 replicas and 5 TS. // 2. Shut down both followers. // 3. Trigger unsafe config change on leader having leader in the config. // 4. Wait until the new config is populated on leader and master. // 5. Verify that new config does not contain old followers. TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleLeader) { const MonoDelta kTimeout = MonoDelta::FromSeconds(30); FLAGS_num_tablet_servers = 5; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart()); // Determine the list of tablet servers currently in the config. TabletServerMap active_tablet_servers; auto iter = tablet_replicas_.equal_range(tablet_id_); for (auto it = iter.first; it != iter.second; ++it) { InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); } // Get a baseline config reported to the master. LOG(INFO) << "Waiting for Master to see the current replicas..."; master::TabletLocationsPB tablet_locations; bool has_leader; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); // Wait for initial NO_OP to be committed by the leader. TServerDetails* leader_ts; ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout)); vector<TServerDetails*> followers; ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); // Shut down servers follower1 and follower2, // so that we can force new config on remaining leader. cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown(); cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown(); // Restart master to cleanup cache of dead servers from its list of candidate // servers to trigger placement of new replicas on healthy servers. cluster_->master()->Shutdown(); ASSERT_OK(cluster_->master()->Restart()); LOG(INFO) << "Forcing unsafe config change on tserver " << leader_ts->uuid(); const string& leader_addr = Substitute("$0:$1", leader_ts->registration.rpc_addresses(0).host(), leader_ts->registration.rpc_addresses(0).port()); const vector<string> peer_uuid_list = { leader_ts->uuid() }; ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list)); // Check that new config is populated to a new follower. vector<TServerDetails*> all_tservers; TServerDetails *new_follower = nullptr; AppendValuesFromMap(tablet_servers_, &all_tservers); for (const auto& ts :all_tservers) { if (!ContainsKey(active_tablet_servers, ts->uuid())) { new_follower = ts; break; } } ASSERT_TRUE(new_follower != nullptr); // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, // so it is safer to wait until consensus metadata has 3 voters on new_follower. ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_follower, tablet_id_, kTimeout)); // Wait for the master to be notified of the config change. LOG(INFO) << "Waiting for Master to see new config..."; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); for (const master::TabletLocationsPB_ReplicaPB& replica : tablet_locations.replicas()) { ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid()); ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid()); } } // Test unsafe config change when the unsafe config contains 2 nodes. // 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS. // 2. Shut down leader. // 3. Trigger unsafe config change on follower1 having follower1 and follower2 in the config. // 4. Wait until the new config is populated on new_leader and master. // 5. Verify that new config does not contain old leader. TEST_F(AdminCliTest, TestUnsafeChangeConfigForConfigWithTwoNodes) { const MonoDelta kTimeout = MonoDelta::FromSeconds(30); FLAGS_num_tablet_servers = 4; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart()); // Determine the list of tablet servers currently in the config. TabletServerMap active_tablet_servers; auto iter = tablet_replicas_.equal_range(tablet_id_); for (auto it = iter.first; it != iter.second; ++it) { InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); } // Get a baseline config reported to the master. LOG(INFO) << "Waiting for Master to see the current replicas..."; master::TabletLocationsPB tablet_locations; bool has_leader; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); // Wait for initial NO_OP to be committed by the leader. TServerDetails* leader_ts; ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout)); vector<TServerDetails*> followers; ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); // Shut down leader and prepare 2-node config. cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown(); // Restart master to cleanup cache of dead servers from its list of candidate // servers to trigger placement of new replicas on healthy servers. cluster_->master()->Shutdown(); ASSERT_OK(cluster_->master()->Restart()); LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid(); const string& follower1_addr = Substitute("$0:$1", followers[1]->registration.rpc_addresses(0).host(), followers[1]->registration.rpc_addresses(0).port()); const vector<string> peer_uuid_list = { followers[0]->uuid(), followers[1]->uuid(), }; ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list)); // Find a remaining node which will be picked for re-replication. vector<TServerDetails*> all_tservers; AppendValuesFromMap(tablet_servers_, &all_tservers); TServerDetails* new_node = nullptr; for (TServerDetails* ts : all_tservers) { if (!ContainsKey(active_tablet_servers, ts->uuid())) { new_node = ts; break; } } ASSERT_TRUE(new_node != nullptr); // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, // so it is safer to wait until consensus metadata has 3 voters on follower1. ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout)); // Wait for the master to be notified of the config change. LOG(INFO) << "Waiting for Master to see new config..."; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); for (const master::TabletLocationsPB_ReplicaPB& replica : tablet_locations.replicas()) { ASSERT_NE(replica.ts_info().permanent_uuid(), leader_ts->uuid()); } } // Test unsafe config change on a 5-replica tablet when the unsafe config contains 2 nodes. // 1. Instantiate external minicluster with 1 tablet having 5 replicas and 8 TS. // 2. Shut down leader and 2 followers. // 3. Trigger unsafe config change on a surviving follower with those // 2 surviving followers in the new config. // 4. Wait until the new config is populated on new_leader and master. // 5. Verify that new config does not contain old leader and old followers. TEST_F(AdminCliTest, TestUnsafeChangeConfigWithFiveReplicaConfig) { const MonoDelta kTimeout = MonoDelta::FromSeconds(30); // Retire the dead servers early with these settings. FLAGS_num_tablet_servers = 8; FLAGS_num_replicas = 5; NO_FATALS(BuildAndStart()); vector<TServerDetails*> tservers; vector<ExternalTabletServer*> external_tservers; AppendValuesFromMap(tablet_servers_, &tservers); for (TServerDetails* ts : tservers) { external_tservers.push_back(cluster_->tablet_server_by_uuid(ts->uuid())); } // Determine the list of tablet servers currently in the config. TabletServerMap active_tablet_servers; auto iter = tablet_replicas_.equal_range(tablet_id_); for (auto it = iter.first; it != iter.second; ++it) { InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); } // Get a baseline config reported to the master. LOG(INFO) << "Waiting for Master to see the current replicas..."; master::TabletLocationsPB tablet_locations; bool has_leader; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 5, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); // Wait for initial NO_OP to be committed by the leader. TServerDetails* leader_ts; ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout)); vector<TServerDetails*> followers; ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); ASSERT_EQ(followers.size(), 4); cluster_->tablet_server_by_uuid(followers[2]->uuid())->Shutdown(); cluster_->tablet_server_by_uuid(followers[3]->uuid())->Shutdown(); cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown(); // Restart master to cleanup cache of dead servers from its list of candidate // servers to trigger placement of new replicas on healthy servers. cluster_->master()->Shutdown(); ASSERT_OK(cluster_->master()->Restart()); LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid(); const string& follower1_addr = Substitute("$0:$1", followers[1]->registration.rpc_addresses(0).host(), followers[1]->registration.rpc_addresses(0).port()); const vector<string> peer_uuid_list = { followers[0]->uuid(), followers[1]->uuid(), }; ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list)); // Find a remaining node which will be picked for re-replication. vector<TServerDetails*> all_tservers; AppendValuesFromMap(tablet_servers_, &all_tservers); TServerDetails* new_node = nullptr; for (TServerDetails* ts : all_tservers) { if (!ContainsKey(active_tablet_servers, ts->uuid())) { new_node = ts; break; } } ASSERT_TRUE(new_node != nullptr); // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, // so it is safer to wait until consensus metadata has 5 voters back on new_node. ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(5, new_node, tablet_id_, kTimeout)); // Wait for the master to be notified of the config change. LOG(INFO) << "Waiting for Master to see new config..."; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 5, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); for (const master::TabletLocationsPB_ReplicaPB& replica : tablet_locations.replicas()) { ASSERT_NE(replica.ts_info().permanent_uuid(), leader_ts->uuid()); ASSERT_NE(replica.ts_info().permanent_uuid(), followers[2]->uuid()); ASSERT_NE(replica.ts_info().permanent_uuid(), followers[3]->uuid()); } } // Test unsafe config change when there is a pending config on a surviving leader. // 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS. // 2. Shut down both the followers. // 3. Trigger a regular config change on the leader which remains pending on leader. // 4. Trigger unsafe config change on the surviving leader. // 5. Wait until the new config is populated on leader and master. // 6. Verify that new config does not contain old followers and a standby node // has populated the new config. TEST_F(AdminCliTest, TestUnsafeChangeConfigLeaderWithPendingConfig) { const MonoDelta kTimeout = MonoDelta::FromSeconds(30); FLAGS_num_tablet_servers = 5; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart()); // Determine the list of tablet servers currently in the config. TabletServerMap active_tablet_servers; auto iter = tablet_replicas_.equal_range(tablet_id_); for (auto it = iter.first; it != iter.second; ++it) { InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); } // Get a baseline config reported to the master. LOG(INFO) << "Waiting for Master to see the current replicas..."; master::TabletLocationsPB tablet_locations; bool has_leader; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); // Wait for initial NO_OP to be committed by the leader. TServerDetails* leader_ts; ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout)); vector<TServerDetails*> followers; ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); ASSERT_EQ(followers.size(), 2); // Shut down servers follower1 and follower2, // so that leader can't replicate future config change ops. cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown(); cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown(); // Now try to replicate a ChangeConfig operation. This should get stuck and time out // because the server can't replicate any operations. Status s = RemoveServer(leader_ts, tablet_id_, followers[1], MonoDelta::FromSeconds(2), -1); ASSERT_TRUE(s.IsTimedOut()); LOG(INFO) << "Change Config Op timed out, Sending a Replace config " << "command when change config op is pending on the leader."; const string& leader_addr = Substitute("$0:$1", leader_ts->registration.rpc_addresses(0).host(), leader_ts->registration.rpc_addresses(0).port()); const vector<string> peer_uuid_list = { leader_ts->uuid() }; ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list)); // Restart master to cleanup cache of dead servers from its list of candidate // servers to trigger placement of new replicas on healthy servers. cluster_->master()->Shutdown(); ASSERT_OK(cluster_->master()->Restart()); // Find a remaining node which will be picked for re-replication. vector<TServerDetails*> all_tservers; AppendValuesFromMap(tablet_servers_, &all_tservers); TServerDetails* new_node = nullptr; for (TServerDetails* ts : all_tservers) { if (!ContainsKey(active_tablet_servers, ts->uuid())) { new_node = ts; break; } } ASSERT_TRUE(new_node != nullptr); // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, // so it is safer to wait until consensus metadata has 3 voters on new_node. ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout)); // Wait for the master to be notified of the config change. LOG(INFO) << "Waiting for Master to see new config..."; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); for (const master::TabletLocationsPB_ReplicaPB& replica : tablet_locations.replicas()) { ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid()); ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid()); } } // Test unsafe config change when there is a pending config on a surviving follower. // 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS. // 2. Shut down both the followers. // 3. Trigger a regular config change on the leader which remains pending on leader. // 4. Trigger a leader_step_down command such that leader is forced to become follower. // 5. Trigger unsafe config change on the follower. // 6. Wait until the new config is populated on leader and master. // 7. Verify that new config does not contain old followers and a standby node // has populated the new config. TEST_F(AdminCliTest, TestUnsafeChangeConfigFollowerWithPendingConfig) { const MonoDelta kTimeout = MonoDelta::FromSeconds(30); FLAGS_num_tablet_servers = 5; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart()); // Determine the list of tablet servers currently in the config. TabletServerMap active_tablet_servers; auto iter = tablet_replicas_.equal_range(tablet_id_); for (auto it = iter.first; it != iter.second; ++it) { InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); } // Get a baseline config reported to the master. LOG(INFO) << "Waiting for Master to see the current replicas..."; master::TabletLocationsPB tablet_locations; bool has_leader; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); // Wait for initial NO_OP to be committed by the leader. TServerDetails* leader_ts; ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout)); vector<TServerDetails*> followers; ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); // Shut down servers follower1 and follower2, // so that leader can't replicate future config change ops. cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown(); cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown(); // Restart master to cleanup cache of dead servers from its // list of candidate servers to place the new replicas. cluster_->master()->Shutdown(); ASSERT_OK(cluster_->master()->Restart()); // Now try to replicate a ChangeConfig operation. This should get stuck and time out // because the server can't replicate any operations. Status s = RemoveServer(leader_ts, tablet_id_, followers[1], MonoDelta::FromSeconds(2), -1); ASSERT_TRUE(s.IsTimedOut()); // Force leader to step down, best effort command since the leadership // could change anytime during cluster lifetime. string stderr; s = RunKuduTool({ "tablet", "leader_step_down", cluster_->master()->bound_rpc_addr().ToString(), tablet_id_ }, nullptr, &stderr); bool not_currently_leader = stderr.find( Status::IllegalState("").CodeAsString()) != string::npos; ASSERT_TRUE(s.ok() || not_currently_leader) << "stderr: " << stderr; LOG(INFO) << "Change Config Op timed out, Sending a Replace config " << "command when change config op is pending on the leader."; const string& leader_addr = Substitute("$0:$1", leader_ts->registration.rpc_addresses(0).host(), leader_ts->registration.rpc_addresses(0).port()); const vector<string> peer_uuid_list = { leader_ts->uuid() }; ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list)); // Find a remaining node which will be picked for re-replication. vector<TServerDetails*> all_tservers; AppendValuesFromMap(tablet_servers_, &all_tservers); TServerDetails* new_node = nullptr; for (TServerDetails* ts : all_tservers) { if (!ContainsKey(active_tablet_servers, ts->uuid())) { new_node = ts; break; } } ASSERT_TRUE(new_node != nullptr); // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, // so it is safer to wait until consensus metadata has 3 voters on new_node. ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout)); // Wait for the master to be notified of the config change. LOG(INFO) << "Waiting for Master to see new config..."; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); for (const master::TabletLocationsPB_ReplicaPB& replica : tablet_locations.replicas()) { ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid()); ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid()); } } // Test unsafe config change when there are back to back pending configs on leader logs. // 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS. // 2. Shut down both the followers. // 3. Trigger a regular config change on the leader which remains pending on leader. // 4. Set a fault crash flag to trigger upon next commit of config change. // 5. Trigger unsafe config change on the surviving leader which should trigger // the fault while the old config change is being committed. // 6. Shutdown and restart the leader and verify that tablet bootstrapped on leader. // 7. Verify that a new node has populated the new config with 3 voters. TEST_F(AdminCliTest, TestUnsafeChangeConfigWithPendingConfigsOnWAL) { const MonoDelta kTimeout = MonoDelta::FromSeconds(30); FLAGS_num_tablet_servers = 5; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart()); // Determine the list of tablet servers currently in the config. TabletServerMap active_tablet_servers; auto iter = tablet_replicas_.equal_range(tablet_id_); for (auto it = iter.first; it != iter.second; ++it) { InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); } // Get a baseline config reported to the master. LOG(INFO) << "Waiting for Master to see the current replicas..."; master::TabletLocationsPB tablet_locations; bool has_leader; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); // Wait for initial NO_OP to be committed by the leader. TServerDetails* leader_ts; ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout)); vector<TServerDetails*> followers; ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); // Shut down servers follower1 and follower2, // so that leader can't replicate future config change ops. cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown(); cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown(); // Now try to replicate a ChangeConfig operation. This should get stuck and time out // because the server can't replicate any operations. Status s = RemoveServer(leader_ts, tablet_id_, followers[1], MonoDelta::FromSeconds(2), -1); ASSERT_TRUE(s.IsTimedOut()); LOG(INFO) << "Change Config Op timed out, Sending a Replace config " << "command when change config op is pending on the leader."; const string& leader_addr = Substitute("$0:$1", leader_ts->registration.rpc_addresses(0).host(), leader_ts->registration.rpc_addresses(0).port()); const vector<string> peer_uuid_list = { leader_ts->uuid() }; ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list)); // Inject the crash via fault_crash_before_cmeta_flush flag. // Tablet will find 2 pending configs back to back during bootstrap, // one from ChangeConfig (RemoveServer) and another from UnsafeChangeConfig. ASSERT_OK(cluster_->SetFlag( cluster_->tablet_server_by_uuid(leader_ts->uuid()), "fault_crash_before_cmeta_flush", "1.0")); // Find a remaining node which will be picked for re-replication. vector<TServerDetails*> all_tservers; AppendValuesFromMap(tablet_servers_, &all_tservers); TServerDetails* new_node = nullptr; for (TServerDetails* ts : all_tservers) { if (!ContainsKey(active_tablet_servers, ts->uuid())) { new_node = ts; break; } } ASSERT_TRUE(new_node != nullptr); // Restart master to cleanup cache of dead servers from its list of candidate // servers to trigger placement of new replicas on healthy servers. cluster_->master()->Shutdown(); ASSERT_OK(cluster_->master()->Restart()); ASSERT_OK(cluster_->tablet_server_by_uuid( leader_ts->uuid())->WaitForInjectedCrash(kTimeout)); cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown(); ASSERT_OK(cluster_->tablet_server_by_uuid( leader_ts->uuid())->Restart()); ASSERT_OK(WaitForNumTabletsOnTS(leader_ts, 1, kTimeout, nullptr)); ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout)); // Wait for the master to be notified of the config change. ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 3, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); for (const master::TabletLocationsPB_ReplicaPB& replica : tablet_locations.replicas()) { ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid()); ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid()); } } // Test unsafe config change on a 5-replica tablet when the mulitple pending configs // on the surviving node. // 1. Instantiate external minicluster with 1 tablet having 5 replicas and 9 TS. // 2. Shut down all the followers. // 3. Trigger unsafe config changes on the surviving leader with those // dead followers in the new config. // 4. Wait until the new config is populated on the master and the new leader. // 5. Verify that new config does not contain old followers. TEST_F(AdminCliTest, TestUnsafeChangeConfigWithMultiplePendingConfigs) { const MonoDelta kTimeout = MonoDelta::FromSeconds(30); FLAGS_num_tablet_servers = 9; FLAGS_num_replicas = 5; // Retire the dead servers early with these settings. NO_FATALS(BuildAndStart()); vector<TServerDetails*> tservers; vector<ExternalTabletServer*> external_tservers; AppendValuesFromMap(tablet_servers_, &tservers); for (TServerDetails* ts : tservers) { external_tservers.push_back(cluster_->tablet_server_by_uuid(ts->uuid())); } // Determine the list of tablet servers currently in the config. TabletServerMap active_tablet_servers; auto iter = tablet_replicas_.equal_range(tablet_id_); for (auto it = iter.first; it != iter.second; ++it) { InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); } // Get a baseline config reported to the master. LOG(INFO) << "Waiting for Master to see the current replicas..."; master::TabletLocationsPB tablet_locations; bool has_leader; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 5, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); TServerDetails* leader_ts; ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); vector<TServerDetails*> followers; for (const auto& elem : active_tablet_servers) { if (elem.first == leader_ts->uuid()) { continue; } followers.push_back(elem.second); cluster_->tablet_server_by_uuid(elem.first)->Shutdown(); } ASSERT_EQ(4, followers.size()); // Shutdown master to cleanup cache of dead servers from its list of candidate // servers to trigger placement of new replicas on healthy servers when we restart later. cluster_->master()->Shutdown(); const string& leader_addr = Substitute("$0:$1", leader_ts->registration.rpc_addresses(0).host(), leader_ts->registration.rpc_addresses(0).port()); // This should keep the multiple pending configs on the node since we are // adding all the dead followers to the new config, and then eventually we write // just one surviving node to the config. // New config write sequences are: {ABCDE}, {ABCD}, {ABC}, {AB}, {A}, // A being the leader node where config is written and rest of the nodes are // dead followers. for (int num_replicas = followers.size(); num_replicas >= 0; num_replicas--) { vector<string> peer_uuid_list; peer_uuid_list.push_back(leader_ts->uuid()); for (int i = 0; i < num_replicas; i++) { peer_uuid_list.push_back(followers[i]->uuid()); } ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list)); } ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(1, leader_ts, tablet_id_, kTimeout)); ASSERT_OK(cluster_->master()->Restart()); // Find a remaining node which will be picked for re-replication. vector<TServerDetails*> all_tservers; AppendValuesFromMap(tablet_servers_, &all_tservers); TServerDetails* new_node = nullptr; for (TServerDetails* ts : all_tservers) { if (!ContainsKey(active_tablet_servers, ts->uuid())) { new_node = ts; break; } } ASSERT_TRUE(new_node != nullptr); // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, // so it is safer to wait until consensus metadata has 5 voters on new_node. ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(5, new_node, tablet_id_, kTimeout)); // Wait for the master to be notified of the config change. LOG(INFO) << "Waiting for Master to see new config..."; ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), 5, tablet_id_, kTimeout, WAIT_FOR_LEADER, VOTER_REPLICA, &has_leader, &tablet_locations)); LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); for (const master::TabletLocationsPB_ReplicaPB& replica : tablet_locations.replicas()) { // Verify that old followers aren't part of new config. for (const auto& old_follower : followers) { ASSERT_NE(replica.ts_info().permanent_uuid(), old_follower->uuid()); } } } Status GetTermFromConsensus(const vector<TServerDetails*>& tservers, const string& tablet_id, int64_t *current_term) { ConsensusStatePB cstate; for (auto& ts : tservers) { RETURN_NOT_OK( GetConsensusState(ts, tablet_id, MonoDelta::FromSeconds(10), EXCLUDE_HEALTH_REPORT, &cstate)); if (!cstate.leader_uuid().empty() && IsRaftConfigMember(cstate.leader_uuid(), cstate.committed_config()) && cstate.has_current_term()) { *current_term = cstate.current_term(); return Status::OK(); } } return Status::NotFound(Substitute( "No leader replica found for tablet $0", tablet_id)); } TEST_F(AdminCliTest, TestAbruptLeaderStepDown) { const MonoDelta kTimeout = MonoDelta::FromSeconds(10); const vector<string> kMasterFlags = { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", }; const vector<string> kTserverFlags = { "--enable_leader_failure_detection=false", }; FLAGS_num_tablet_servers = 3; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); // Wait for the tablet to be running. vector<TServerDetails*> tservers; AppendValuesFromMap(tablet_servers_, &tservers); ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); for (auto& ts : tservers) { ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); } // Elect the leader and wait for the tservers and master to see the leader. const auto* leader = tservers[0]; ASSERT_OK(StartElection(leader, tablet_id_, kTimeout)); ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, /*minimum_index=*/1)); TServerDetails* master_observed_leader; ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); // Ask the leader to step down. string stderr; Status s = RunKuduTool({ "tablet", "leader_step_down", "--abrupt", cluster_->master()->bound_rpc_addr().ToString(), tablet_id_ }, nullptr, &stderr); // There shouldn't be a leader now, since failure detection is disabled. for (const auto* ts : tservers) { s = GetReplicaStatusAndCheckIfLeader(ts, tablet_id_, kTimeout); ASSERT_TRUE(s.IsIllegalState()) << "Expected IllegalState because replica " "should not be the leader: " << s.ToString(); } } TEST_F(AdminCliTest, TestGracefulLeaderStepDown) { const MonoDelta kTimeout = MonoDelta::FromSeconds(10); const vector<string> kMasterFlags = { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", }; const vector<string> kTserverFlags = { "--enable_leader_failure_detection=false", }; FLAGS_num_tablet_servers = 3; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); // Wait for the tablet to be running. vector<TServerDetails*> tservers; AppendValuesFromMap(tablet_servers_, &tservers); ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); for (auto& ts : tservers) { ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); } // Elect the leader and wait for the tservers and master to see the leader. const auto* leader = tservers[0]; ASSERT_OK(StartElection(leader, tablet_id_, kTimeout)); ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, /*minimum_index=*/1)); TServerDetails* master_observed_leader; ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); // Ask the leader to transfer leadership to a specific peer. const auto new_leader_uuid = tservers[1]->uuid(); string stderr; Status s = RunKuduTool({ "tablet", "leader_step_down", Substitute("--new_leader_uuid=$0", new_leader_uuid), cluster_->master()->bound_rpc_addr().ToString(), tablet_id_ }, nullptr, &stderr); ASSERT_TRUE(s.ok()) << s.ToString(); // Eventually, the chosen node should become leader. ASSERT_EVENTUALLY([&]() { ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); ASSERT_EQ(new_leader_uuid, master_observed_leader->uuid()); }); // Ask the leader to transfer leadership. s = RunKuduTool({ "tablet", "leader_step_down", cluster_->master()->bound_rpc_addr().ToString(), tablet_id_ }, nullptr, &stderr); ASSERT_TRUE(s.ok()) << s.ToString(); // Eventually, some other node should become leader. const std::unordered_set<string> possible_new_leaders = { tservers[0]->uuid(), tservers[2]->uuid() }; ASSERT_EVENTUALLY([&]() { ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); ASSERT_TRUE(ContainsKey(possible_new_leaders, master_observed_leader->uuid())); }); } // Leader should reject requests to transfer leadership to a non-member of the // config. TEST_F(AdminCliTest, TestLeaderTransferToEvictedPeer) { const MonoDelta kTimeout = MonoDelta::FromSeconds(10); // In this test, tablet leadership is manually controlled and the master // should not rereplicate. const vector<string> kMasterFlags = { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", "--master_add_server_when_underreplicated=false", }; const vector<string> kTserverFlags = { "--enable_leader_failure_detection=false", }; FLAGS_num_tablet_servers = 3; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); // Wait for the tablet to be running. vector<TServerDetails*> tservers; AppendValuesFromMap(tablet_servers_, &tservers); ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); for (auto& ts : tservers) { ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); } // Elect the leader and wait for the tservers and master to see the leader. const auto* leader = tservers[0]; ASSERT_OK(StartElection(leader, tablet_id_, kTimeout)); ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, /*minimum_index=*/1)); TServerDetails* master_observed_leader; ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); // Evict the first follower. string stderr; const auto evicted_uuid = tservers[1]->uuid(); Status s = RunKuduTool({ "tablet", "change_config", "remove_replica", master_addr, tablet_id_, evicted_uuid, }, nullptr, &stderr); ASSERT_TRUE(s.ok()) << s.ToString() << " stderr: " << stderr; // Ask the leader to transfer leadership to the evicted peer. stderr.clear(); s = RunKuduTool({ "tablet", "leader_step_down", Substitute("--new_leader_uuid=$0", evicted_uuid), master_addr, tablet_id_ }, nullptr, &stderr); ASSERT_TRUE(s.IsRuntimeError()) << s.ToString() << " stderr: " << stderr; ASSERT_STR_CONTAINS(stderr, Substitute("tablet server $0 is not a voter in the active config", evicted_uuid)); } // Leader should reject requests to transfer leadership to a non-voter of the // config. TEST_F(AdminCliTest, TestLeaderTransferToNonVoter) { const MonoDelta kTimeout = MonoDelta::FromSeconds(10); // In this test, tablet leadership is manually controlled and the master // should not rereplicate. const vector<string> kMasterFlags = { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", "--master_add_server_when_underreplicated=false", }; const vector<string> kTserverFlags = { "--enable_leader_failure_detection=false", }; FLAGS_num_tablet_servers = 3; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); // Wait for the tablet to be running. vector<TServerDetails*> tservers; AppendValuesFromMap(tablet_servers_, &tservers); ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); for (auto& ts : tservers) { ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); } // Elect the leader and wait for the tservers and master to see the leader. const auto* leader = tservers[0]; ASSERT_OK(StartElection(leader, tablet_id_, kTimeout)); ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, /*minimum_index=*/1)); TServerDetails* master_observed_leader; ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); // Demote the first follower to a non-voter. string stderr; const auto non_voter_uuid = tservers[1]->uuid(); Status s = RunKuduTool({ "tablet", "change_config", "change_replica_type", master_addr, tablet_id_, non_voter_uuid, "NON_VOTER", }, nullptr, &stderr); ASSERT_TRUE(s.ok()) << s.ToString() << " stderr: " << stderr; // Ask the leader to transfer leadership to the non-voter. stderr.clear(); s = RunKuduTool({ "tablet", "leader_step_down", Substitute("--new_leader_uuid=$0", non_voter_uuid), master_addr, tablet_id_ }, nullptr, &stderr); ASSERT_TRUE(s.IsRuntimeError()) << s.ToString() << " stderr: " << stderr; ASSERT_STR_CONTAINS(stderr, Substitute("tablet server $0 is not a voter in the active config", non_voter_uuid)); } // Leader transfer causes the tablet to stop accepting new writes. This test // tests that writes can still succeed even if lots of leader transfers and // abrupt stepdowns are happening, as long as the writes have long enough // timeouts to ride over the unstable leadership. TEST_F(AdminCliTest, TestSimultaneousLeaderTransferAndAbruptStepdown) { if (!AllowSlowTests()) { LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; return; } const MonoDelta kTimeout = MonoDelta::FromSeconds(10); FLAGS_num_tablet_servers = 3; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart()); // Wait for the tablet to be running. vector<TServerDetails*> tservers; AppendValuesFromMap(tablet_servers_, &tservers); ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); for (auto& ts : tservers) { ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); } // Start a workload with long timeouts. Everything should eventually go // through but it might take a while given the leadership changes. TestWorkload workload(cluster_.get()); workload.set_table_name(kTableId); workload.set_timeout_allowed(false); workload.set_write_timeout_millis(60000); workload.set_num_replicas(FLAGS_num_replicas); workload.set_num_write_threads(1); workload.set_write_batch_size(1); workload.Setup(); workload.Start(); const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); while (workload.rows_inserted() < 1000) { // Issue a graceful stepdown and then an abrupt stepdown, every second. // The results are ignored because the tools might fail due to the // constant leadership changes. ignore_result(RunKuduTool({ "tablet", "leader_step_down", master_addr, tablet_id_ })); ignore_result(RunKuduTool({ "tablet", "leader_step_down", "--abrupt", master_addr, tablet_id_ })); SleepFor(MonoDelta::FromMilliseconds(1000)); } } class TestLeaderStepDown : public AdminCliTest, public ::testing::WithParamInterface<LeaderStepDownMode> { }; INSTANTIATE_TEST_CASE_P(, TestLeaderStepDown, ::testing::Values(LeaderStepDownMode::ABRUPT, LeaderStepDownMode::GRACEFUL)); TEST_P(TestLeaderStepDown, TestLeaderStepDownWhenNotPresent) { FLAGS_num_tablet_servers = 3; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart( { "--enable_leader_failure_detection=false" }, { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" })); vector<TServerDetails*> tservers; AppendValuesFromMap(tablet_servers_, &tservers); ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); for (auto& ts : tservers) { ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, MonoDelta::FromSeconds(10))); } int64_t current_term; ASSERT_TRUE(GetTermFromConsensus(tservers, tablet_id_, &current_term).IsNotFound()); string stdout; ASSERT_OK(RunKuduTool({ "tablet", "leader_step_down", Substitute("--abrupt=$0", GetParam() == LeaderStepDownMode::ABRUPT), cluster_->master()->bound_rpc_addr().ToString(), tablet_id_ }, &stdout)); ASSERT_STR_CONTAINS(stdout, Substitute("No leader replica found for tablet $0", tablet_id_)); } TEST_P(TestLeaderStepDown, TestRepeatedLeaderStepDown) { FLAGS_num_tablet_servers = 3; FLAGS_num_replicas = 3; // Speed up leader failure detection and shorten the leader transfer period. NO_FATALS(BuildAndStart({ "--raft_heartbeat_interval_ms=50" })); vector<TServerDetails*> tservers; AppendValuesFromMap(tablet_servers_, &tservers); ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); for (auto& ts : tservers) { ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, MonoDelta::FromSeconds(10))); } // Start a workload. TestWorkload workload(cluster_.get()); workload.set_table_name(kTableId); workload.set_timeout_allowed(false); workload.set_write_timeout_millis(30000); workload.set_num_replicas(FLAGS_num_replicas); workload.set_num_write_threads(4); workload.set_write_batch_size(1); workload.Setup(); workload.Start(); // Issue stepdown requests repeatedly. If we leave some time for an election, // the workload should still make progress. const string abrupt_flag = Substitute("--abrupt=$0", GetParam() == LeaderStepDownMode::ABRUPT); string stdout; string stderr; while (workload.rows_inserted() < 2000) { stdout.clear(); stderr.clear(); Status s = RunKuduTool({ "tablet", "leader_step_down", abrupt_flag, cluster_->master()->bound_rpc_addr().ToString(), tablet_id_ }, &stdout, &stderr); bool not_currently_leader = stderr.find( Status::IllegalState("").CodeAsString()) != string::npos; ASSERT_TRUE(s.ok() || not_currently_leader) << s.ToString(); SleepFor(MonoDelta::FromMilliseconds(1000)); } ClusterVerifier(cluster_.get()).CheckCluster(); } TEST_F(AdminCliTest, TestDeleteTable) { FLAGS_num_tablet_servers = 1; FLAGS_num_replicas = 1; NO_FATALS(BuildAndStart()); string master_address = cluster_->master()->bound_rpc_addr().ToString(); shared_ptr<KuduClient> client; ASSERT_OK(KuduClientBuilder() .add_master_server_addr(master_address) .Build(&client)); ASSERT_TOOL_OK( "table", "delete", master_address, kTableId ); vector<string> tables; ASSERT_OK(client->ListTables(&tables)); ASSERT_TRUE(tables.empty()); } TEST_F(AdminCliTest, TestListTables) { FLAGS_num_tablet_servers = 1; FLAGS_num_replicas = 1; NO_FATALS(BuildAndStart()); string stdout; ASSERT_OK(RunKuduTool({ "table", "list", cluster_->master()->bound_rpc_addr().ToString() }, &stdout)); vector<string> stdout_lines = Split(stdout, ",", strings::SkipEmpty()); ASSERT_EQ(1, stdout_lines.size()); ASSERT_EQ(Substitute("$0\n", kTableId), stdout_lines[0]); } TEST_F(AdminCliTest, TestListTablesDetail) { FLAGS_num_tablet_servers = 3; FLAGS_num_replicas = 3; NO_FATALS(BuildAndStart()); // Add another table to test multiple tables output. const string kAnotherTableId = "TestAnotherTable"; KuduSchema client_schema(client::KuduSchemaFromSchema(schema_)); gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); ASSERT_OK(table_creator->table_name(kAnotherTableId) .schema(&client_schema) .set_range_partition_columns({ "key" }) .num_replicas(FLAGS_num_replicas) .Create()); // Grab list of tablet_ids from any tserver. vector<TServerDetails*> tservers; vector<string> tablet_ids; AppendValuesFromMap(tablet_servers_, &tservers); ListRunningTabletIds(tservers.front(), MonoDelta::FromSeconds(30), &tablet_ids); string stdout; ASSERT_OK(RunKuduTool({ "table", "list", "--list_tablets", cluster_->master()->bound_rpc_addr().ToString() }, &stdout)); vector<string> stdout_lines = Split(stdout, "\n", strings::SkipEmpty()); // Verify multiple tables along with their tablets and replica-uuids. ASSERT_EQ(10, stdout_lines.size()); ASSERT_STR_CONTAINS(stdout, kTableId); ASSERT_STR_CONTAINS(stdout, kAnotherTableId); ASSERT_STR_CONTAINS(stdout, tablet_ids.front()); ASSERT_STR_CONTAINS(stdout, tablet_ids.back()); for (auto& ts : tservers) { ASSERT_STR_CONTAINS(stdout, ts->uuid()); ASSERT_STR_CONTAINS(stdout, ts->uuid()); } } TEST_F(AdminCliTest, RebalancerReportOnly) { static const char kReferenceOutput[] = R"***(Per-server replica distribution summary: Statistic | Value -----------------------+---------- Minimum Replica Count | 0 Maximum Replica Count | 1 Average Replica Count | 0.600000 Per-table replica distribution summary: Replica Skew | Value --------------+---------- Minimum | 1 Maximum | 1 Average | 1.000000)***"; FLAGS_num_tablet_servers = 5; NO_FATALS(BuildAndStart()); string out; string err; Status s = RunKuduTool({ "cluster", "rebalance", cluster_->master()->bound_rpc_addr().ToString(), "--report_only", }, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); // The rebalancer should report on tablet replica distribution. The output // should match the reference report: the distribution of the replicas // is 100% repeatable given the number of tables created by the test, // the replication factor and the number of tablet servers. ASSERT_STR_CONTAINS(out, kReferenceOutput); // The actual rebalancing should not run. ASSERT_STR_NOT_CONTAINS(out, "rebalancing is complete:") << ToolRunInfo(s, out, err); } // Make sure the rebalancer doesn't start if a tablet server is down. class RebalanceStartCriteriaTest : public AdminCliTest, public ::testing::WithParamInterface<Kudu1097> { }; INSTANTIATE_TEST_CASE_P(, RebalanceStartCriteriaTest, ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)); TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) { const bool is_343_scheme = (GetParam() == Kudu1097::Enable); const vector<string> kMasterFlags = { Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme), }; const vector<string> kTserverFlags = { Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme), }; FLAGS_num_tablet_servers = 5; NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); // Shutdown one of the tablet servers. HostPort ts_host_port; { auto* ts = cluster_->tablet_server(0); ASSERT_NE(nullptr, ts); ts_host_port = ts->bound_rpc_hostport(); ts->Shutdown(); } string out; string err; Status s = RunKuduTool({ "cluster", "rebalance", cluster_->master()->bound_rpc_addr().ToString() }, &out, &err); ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err); const auto err_msg_pattern = Substitute( "Illegal state: tablet server .* \\($0\\): " "unacceptable health status UNAVAILABLE", ts_host_port.ToString()); ASSERT_STR_MATCHES(err, err_msg_pattern); } // Create tables with unbalanced replica distribution: useful in // rebalancer-related tests. static Status CreateUnbalancedTables( cluster::ExternalMiniCluster* cluster, client::KuduClient* client, const Schema& table_schema, const string& table_name_pattern, int num_tables, int rep_factor, int tserver_idx_from, int tserver_num, int tserver_unresponsive_ms) { // Keep running only some tablet servers and shut down the rest. for (auto i = tserver_idx_from; i < tserver_num; ++i) { cluster->tablet_server(i)->Shutdown(); } // Wait for the catalog manager to understand that not all tablet servers // are available. SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4)); // Create tables with their tablet replicas landing only on the tablet servers // which are up and running. KuduSchema client_schema(client::KuduSchemaFromSchema(table_schema)); for (auto i = 0; i < num_tables; ++i) { const string table_name = Substitute(table_name_pattern, i); unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); RETURN_NOT_OK(table_creator->table_name(table_name) .schema(&client_schema) .add_hash_partitions({ "key" }, 3) .num_replicas(rep_factor) .Create()); RETURN_NOT_OK(RunKuduTool({ "perf", "loadgen", cluster->master()->bound_rpc_addr().ToString(), Substitute("--table_name=$0", table_name), Substitute("--table_num_replicas=$0", rep_factor), "--string_fixed=unbalanced_tables_test", })); } for (auto i = tserver_idx_from; i < tserver_num; ++i) { RETURN_NOT_OK(cluster->tablet_server(i)->Restart()); } return Status::OK(); } // A test to verify that rebalancing works for both 3-4-3 and 3-2-3 replica // management schemes. During replica movement, a light workload is run against // every table being rebalanced. This test covers different replication factors. class RebalanceParamTest : public AdminCliTest, public ::testing::WithParamInterface<tuple<int, Kudu1097>> { }; INSTANTIATE_TEST_CASE_P(, RebalanceParamTest, ::testing::Combine(::testing::Values(1, 2, 3, 5), ::testing::Values(Kudu1097::Disable, Kudu1097::Enable))); TEST_P(RebalanceParamTest, Rebalance) { if (!AllowSlowTests()) { LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; return; } const auto& param = GetParam(); const auto kRepFactor = std::get<0>(param); const auto is_343_scheme = (std::get<1>(param) == Kudu1097::Enable); constexpr auto kNumTservers = 7; constexpr auto kNumTables = 5; const string table_name_pattern = "rebalance_test_table_$0"; constexpr auto kTserverUnresponsiveMs = 3000; const auto timeout = MonoDelta::FromSeconds(30); const vector<string> kMasterFlags = { "--allow_unsafe_replication_factor", Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme), Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs), }; const vector<string> kTserverFlags = { Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme), }; FLAGS_num_tablet_servers = kNumTservers; FLAGS_num_replicas = kRepFactor; NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); ASSERT_OK(CreateUnbalancedTables( cluster_.get(), client_.get(), schema_, table_name_pattern, kNumTables, kRepFactor, kRepFactor + 1, kNumTservers, kTserverUnresponsiveMs)); // Workloads aren't run for 3-2-3 replica movement with RF = 1 because // the tablet is unavailable during the move until the target voter replica // is up and running. That might take some time, and to avoid flakiness or // setting longer timeouts, RF=1 replicas are moved with no concurrent // workload running. // // TODO(aserbin): clarify why even with 3-4-3 it's a bit flaky now. vector<unique_ptr<TestWorkload>> workloads; //if (kRepFactor > 1 || is_343_scheme) { if (kRepFactor > 1) { for (auto i = 0; i < kNumTables; ++i) { const string table_name = Substitute(table_name_pattern, i); // The workload is light (1 thread, 1 op batches) so that new replicas // bootstrap and converge quickly. unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get())); workload->set_table_name(table_name); workload->set_num_replicas(kRepFactor); workload->set_num_write_threads(1); workload->set_write_batch_size(1); workload->set_write_timeout_millis(timeout.ToMilliseconds()); workload->set_already_present_allowed(true); workload->Setup(); workload->Start(); workloads.emplace_back(std::move(workload)); } } const vector<string> tool_args = { "cluster", "rebalance", cluster_->master()->bound_rpc_addr().ToString(), "--move_single_replicas=enabled", }; { string out; string err; const Status s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") << "stderr: " << err; } // Next run should report the cluster as balanced and no replica movement // should be attempted. { string out; string err; const Status s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced (moved 0 replicas)") << "stderr: " << err; } for (auto& workload : workloads) { workload->StopAndJoin(); } NO_FATALS(cluster_->AssertNoCrashes()); NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); // Now add a new tablet server into the cluster and make sure the rebalancer // will re-distribute replicas. ASSERT_OK(cluster_->AddTabletServer()); { string out; string err; const Status s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") << "stderr: " << err; // The cluster was un-balanced, so many replicas should have been moved. ASSERT_STR_NOT_CONTAINS(out, "(moved 0 replicas)"); } NO_FATALS(cluster_->AssertNoCrashes()); NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); } // Common base for the rebalancer-related test below. class RebalancingTest : public tserver::TabletServerIntegrationTestBase, public ::testing::WithParamInterface<Kudu1097> { public: RebalancingTest(int num_tables = 10, int rep_factor = 3, int num_tservers = 8, int tserver_unresponsive_ms = 3000, const string& table_name_pattern = "rebalance_test_table_$0") : TabletServerIntegrationTestBase(), is_343_scheme_(GetParam() == Kudu1097::Enable), num_tables_(num_tables), rep_factor_(rep_factor), num_tservers_(num_tservers), tserver_unresponsive_ms_(tserver_unresponsive_ms), table_name_pattern_(table_name_pattern) { master_flags_ = { Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_), Substitute("--tserver_unresponsive_timeout_ms=$0", tserver_unresponsive_ms_), }; tserver_flags_ = { Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_), }; } protected: static const char* const kExitOnSignalStr; void Prepare(const vector<string>& extra_tserver_flags = {}, const vector<string>& extra_master_flags = {}) { copy(extra_tserver_flags.begin(), extra_tserver_flags.end(), back_inserter(tserver_flags_)); copy(extra_master_flags.begin(), extra_master_flags.end(), back_inserter(master_flags_)); FLAGS_num_tablet_servers = num_tservers_; FLAGS_num_replicas = rep_factor_; NO_FATALS(BuildAndStart(tserver_flags_, master_flags_)); ASSERT_OK(CreateUnbalancedTables( cluster_.get(), client_.get(), schema_, table_name_pattern_, num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_, tserver_unresponsive_ms_)); } // When the rebalancer starts moving replicas, ksck detects corruption // (that's why RuntimeError), seeing affected tables as non-healthy // with data state of corresponding tablets as TABLET_DATA_COPYING. If using // this method, it's a good idea to inject some latency into tablet copying // to be able to spot the TABLET_DATA_COPYING state, see the // '--tablet_copy_download_file_inject_latency_ms' flag for tservers. bool IsRebalancingInProgress() { string out; const auto s = RunKuduTool({ "cluster", "ksck", cluster_->master()->bound_rpc_addr().ToString(), }, &out); if (s.IsRuntimeError() && out.find("Data state: TABLET_DATA_COPYING") != string::npos) { return true; } return false; } const bool is_343_scheme_; const int num_tables_; const int rep_factor_; const int num_tservers_; const int tserver_unresponsive_ms_; const string table_name_pattern_; vector<string> tserver_flags_; vector<string> master_flags_; }; const char* const RebalancingTest::kExitOnSignalStr = "kudu: process exited on signal"; // Make sure the rebalancer is able to do its job if running concurrently // with DDL activity on the cluster. class DDLDuringRebalancingTest : public RebalancingTest { public: DDLDuringRebalancingTest() : RebalancingTest(20 /* num_tables */) { } }; INSTANTIATE_TEST_CASE_P(, DDLDuringRebalancingTest, ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)); TEST_P(DDLDuringRebalancingTest, TablesCreatedAndDeletedDuringRebalancing) { if (!AllowSlowTests()) { LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; return; } NO_FATALS(Prepare()); // The latch that controls the lifecycle of the concurrent DDL activity. CountDownLatch run_latch(1); thread creator([&]() { KuduSchema client_schema(client::KuduSchemaFromSchema(schema_)); for (auto idx = 0; ; ++idx) { if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) { break; } const string table_name = Substitute("rebalancer_extra_table_$0", idx++); unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); CHECK_OK(table_creator->table_name(table_name) .schema(&client_schema) .add_hash_partitions({ "key" }, 3) .num_replicas(rep_factor_) .Create()); } }); auto creator_cleanup = MakeScopedCleanup([&]() { run_latch.CountDown(); creator.join(); }); thread deleter([&]() { for (auto idx = 0; idx < num_tables_; ++idx) { if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) { break; } CHECK_OK(client_->DeleteTable(Substitute(table_name_pattern_, idx++))); } }); auto deleter_cleanup = MakeScopedCleanup([&]() { run_latch.CountDown(); deleter.join(); }); thread alterer([&]() { const string kTableName = "rebalancer_dynamic_table"; const string kNewTableName = "rebalancer_dynamic_table_new_name"; while (true) { // Create table. { KuduSchema schema; KuduSchemaBuilder builder; builder.AddColumn("key")->Type(KuduColumnSchema::INT64)-> NotNull()-> PrimaryKey(); builder.AddColumn("a")->Type(KuduColumnSchema::INT64); CHECK_OK(builder.Build(&schema)); unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); CHECK_OK(table_creator->table_name(kTableName) .schema(&schema) .set_range_partition_columns({}) .num_replicas(rep_factor_) .Create()); } if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) { break; } // Drop a column. { unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName)); alt->DropColumn("a"); CHECK_OK(alt->Alter()); } if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) { break; } // Add back the column with different type. { unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName)); alt->AddColumn("a")->Type(KuduColumnSchema::STRING); CHECK_OK(alt->Alter()); } if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) { break; } // Rename the table. { unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName)); alt->RenameTo(kNewTableName); CHECK_OK(alt->Alter()); } if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) { break; } // Drop the renamed table. CHECK_OK(client_->DeleteTable(kNewTableName)); if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) { break; } } }); auto alterer_cleanup = MakeScopedCleanup([&]() { run_latch.CountDown(); alterer.join(); }); thread timer([&]() { SleepFor(MonoDelta::FromSeconds(30)); run_latch.CountDown(); }); auto timer_cleanup = MakeScopedCleanup([&]() { timer.join(); }); const vector<string> tool_args = { "cluster", "rebalance", cluster_->master()->bound_rpc_addr().ToString(), }; // Run the rebalancer concurrently with the DDL operations. The second run // of the rebalancer (the second run starts after joining the timer thread) // is necessary to balance the cluster after the DDL activity stops: that's // the easiest way to make sure the rebalancer will take into account // all DDL changes that happened. // // The signal to terminate the DDL activity (done via run_latch.CountDown()) // is sent from a separate timer thread instead of doing SleepFor() after // the first run of the rebalancer followed by run_latch.CountDown(). // That's to avoid dependency on the rebalancer behavior if it spots on-going // DDL activity and continues running over and over again. for (auto i = 0; i < 2; ++i) { if (i == 1) { timer.join(); timer_cleanup.cancel(); // Wait for all the DDL activity to complete. alterer.join(); alterer_cleanup.cancel(); deleter.join(); deleter_cleanup.cancel(); creator.join(); creator_cleanup.cancel(); } string out; string err; const auto s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") << "stderr: " << err; } // Next (3rd) run should report the cluster as balanced and // no replica movement should be attempted. { string out; string err; const auto s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced (moved 0 replicas)") << "stderr: " << err; } NO_FATALS(cluster_->AssertNoCrashes()); NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); } // Make sure it's safe to run multiple rebalancers concurrently. The rebalancers // might report errors, but they should not get stuck and the cluster should // remain in good shape (i.e. no crashes, no data inconsistencies). Re-running a // single rebalancer session again should bring the cluster to a balanced state. class ConcurrentRebalancersTest : public RebalancingTest { public: ConcurrentRebalancersTest() : RebalancingTest(10 /* num_tables */) { } }; INSTANTIATE_TEST_CASE_P(, ConcurrentRebalancersTest, ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)); TEST_P(ConcurrentRebalancersTest, TwoConcurrentRebalancers) { if (!AllowSlowTests()) { LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; return; } NO_FATALS(Prepare()); const vector<string> tool_args = { "cluster", "rebalance", cluster_->master()->bound_rpc_addr().ToString(), }; const auto runner_func = [&]() { string out; string err; const auto s = RunKuduTool(tool_args, &out, &err); if (!s.ok()) { // One might expect a bad status returned: e.g., due to some race so // the rebalancer didn't able to make progress for more than // --max_staleness_interval_sec, etc. LOG(INFO) << "rebalancer run info: " << ToolRunInfo(s, out, err); } // Should not exit on a signal: not expecting SIGSEGV, SIGABRT, etc. return s.ToString().find(kExitOnSignalStr) == string::npos; }; CountDownLatch start_synchronizer(1); vector<thread> concurrent_runners; for (auto i = 0; i < 5; ++i) { concurrent_runners.emplace_back([&]() { start_synchronizer.Wait(); CHECK(runner_func()); }); } // Run rebalancers concurrently and wait for their completion. start_synchronizer.CountDown(); for (auto& runner : concurrent_runners) { runner.join(); } NO_FATALS(cluster_->AssertNoCrashes()); NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); { string out; string err; const auto s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") << "stderr: " << err; } // Next run should report the cluster as balanced and no replica movement // should be attempted: at least one run of the rebalancer prior to this // should succeed, so next run is about running the tool against already // balanced cluster. { string out; string err; const auto s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced (moved 0 replicas)") << "stderr: " << err; } NO_FATALS(cluster_->AssertNoCrashes()); NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); } // The rebalancer should stop and exit upon detecting a tablet server that // went down. That's a simple and effective way of preventing concurrent replica // movement by the rebalancer and the automatic re-replication (the catalog // manager tries to move replicas from the unreachable tablet server). class TserverGoesDownDuringRebalancingTest : public RebalancingTest { public: TserverGoesDownDuringRebalancingTest() : RebalancingTest(5 /* num_tables */) { } }; INSTANTIATE_TEST_CASE_P(, TserverGoesDownDuringRebalancingTest, ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)); TEST_P(TserverGoesDownDuringRebalancingTest, TserverDown) { if (!AllowSlowTests()) { LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; return; } const vector<string> kTserverExtraFlags = { // Slow down tablet copy to make rebalancing step running longer // and become observable via tablet data states output by ksck. "--tablet_copy_download_file_inject_latency_ms=1500", "--follower_unavailable_considered_failed_sec=30", }; NO_FATALS(Prepare(kTserverExtraFlags)); // Pre-condition: 'kudu cluster ksck' should be happy with the cluster state // shortly after initial setup. ASSERT_EVENTUALLY([&]() { ASSERT_TOOL_OK( "cluster", "ksck", cluster_->master()->bound_rpc_addr().ToString() ) }); Random r(SeedRandom()); const uint32_t shutdown_tserver_idx = r.Next() % num_tservers_; atomic<bool> run(true); // The thread that shuts down the selected tablet server. thread stopper([&]() { while (run && !IsRebalancingInProgress()) { SleepFor(MonoDelta::FromMilliseconds(10)); } // All right, it's time to stop the selected tablet server. cluster_->tablet_server(shutdown_tserver_idx)->Shutdown(); }); auto stopper_cleanup = MakeScopedCleanup([&]() { run = false; stopper.join(); }); { string out; string err; const auto s = RunKuduTool({ "cluster", "rebalance", cluster_->master()->bound_rpc_addr().ToString(), // Limiting the number of replicas to move. This is to make the rebalancer // run longer, making sure the rebalancing is in progress when the tablet // server goes down. "--max_moves_per_server=1", }, &out, &err); ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err); // The rebalancer tool should not crash. ASSERT_STR_NOT_CONTAINS(s.ToString(), kExitOnSignalStr); ASSERT_STR_MATCHES( err, "Illegal state: tablet server .* \\(.*\\): " "unacceptable health status UNAVAILABLE"); } run = false; stopper.join(); stopper_cleanup.cancel(); ASSERT_OK(cluster_->tablet_server(shutdown_tserver_idx)->Restart()); NO_FATALS(cluster_->AssertNoCrashes()); } // The rebalancer should continue working and complete rebalancing successfully // if a new tablet server is added while the cluster is being rebalanced. class TserverAddedDuringRebalancingTest : public RebalancingTest { public: TserverAddedDuringRebalancingTest() : RebalancingTest(10 /* num_tables */) { } }; INSTANTIATE_TEST_CASE_P(, TserverAddedDuringRebalancingTest, ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)); TEST_P(TserverAddedDuringRebalancingTest, TserverStarts) { if (!AllowSlowTests()) { LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; return; } const vector<string> kTserverExtraFlags = { // Slow down tablet copy to make rebalancing step running longer // and become observable via tablet data states output by ksck. "--tablet_copy_download_file_inject_latency_ms=1500", "--follower_unavailable_considered_failed_sec=30", }; NO_FATALS(Prepare(kTserverExtraFlags)); const vector<string> tool_args = { "cluster", "rebalance", cluster_->master()->bound_rpc_addr().ToString(), }; atomic<bool> run(true); thread runner([&]() { while (run) { string out; string err; const auto s = RunKuduTool(tool_args, &out, &err); CHECK(s.ok()) << ToolRunInfo(s, out, err); } }); auto runner_cleanup = MakeScopedCleanup([&]() { run = false; runner.join(); }); while (!IsRebalancingInProgress()) { SleepFor(MonoDelta::FromMilliseconds(10)); } // It's time to sneak in and add new tablet server. ASSERT_OK(cluster_->AddTabletServer()); run = false; runner.join(); runner_cleanup.cancel(); // The rebalancer should not fail, and eventually, after a new tablet server // is added, the cluster should become balanced. ASSERT_EVENTUALLY([&]() { string out; string err; const auto s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced (moved 0 replicas)") << "stderr: " << err; }); NO_FATALS(cluster_->AssertNoCrashes()); NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); } // Run rebalancer in 'election storms' environment and make sure the rebalancer // does not exit prematurely or exhibit any other unexpected behavior. class RebalancingDuringElectionStormTest : public RebalancingTest { }; INSTANTIATE_TEST_CASE_P(, RebalancingDuringElectionStormTest, ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)); TEST_P(RebalancingDuringElectionStormTest, RoundRobin) { if (!AllowSlowTests()) { LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; return; } NO_FATALS(Prepare()); atomic<bool> elector_run(true); #if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) // The timeout is a time-to-run for the stormy elector thread as well. // Making longer timeout for workload in case of TSAN/ASAN is not needed: // having everything generated written is not required. const auto timeout = MonoDelta::FromSeconds(5); #else const auto timeout = MonoDelta::FromSeconds(10); #endif const auto start_time = MonoTime::Now(); thread elector([&]() { // Mininum viable divider for modulo ('%') to allow the result to grow by // the rules below. auto max_sleep_ms = 2.0; while (elector_run && MonoTime::Now() < start_time + timeout) { for (const auto& e : tablet_servers_) { const auto& ts_uuid = e.first; const auto* ts = e.second; vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; auto const s = itest::ListTablets(ts, timeout, &tablets); if (!s.ok()) { LOG(WARNING) << ts_uuid << ": failed to get tablet list :" << s.ToString(); continue; } consensus::ConsensusServiceProxy proxy( cluster_->messenger(), cluster_->tablet_server_by_uuid(ts_uuid)->bound_rpc_addr(), "tserver " + ts_uuid); for (const auto& tablet : tablets) { const auto& tablet_id = tablet.tablet_status().tablet_id(); consensus::RunLeaderElectionRequestPB req; req.set_tablet_id(tablet_id); req.set_dest_uuid(ts_uuid); rpc::RpcController rpc; rpc.set_timeout(timeout); consensus::RunLeaderElectionResponsePB resp; WARN_NOT_OK(proxy.RunLeaderElection(req, &resp, &rpc), Substitute("failed to start election for tablet $0", tablet_id)); } if (!elector_run || start_time + timeout <= MonoTime::Now()) { break; } auto sleep_ms = rand() % static_cast<int>(max_sleep_ms); SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); max_sleep_ms = std::min(max_sleep_ms * 1.1, 2000.0); } } }); auto elector_cleanup = MakeScopedCleanup([&]() { elector_run = false; elector.join(); }); #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER) vector<unique_ptr<TestWorkload>> workloads; for (auto i = 0; i < num_tables_; ++i) { const string table_name = Substitute(table_name_pattern_, i); // The workload is light (1 thread, 1 op batches) and lenient to failures. unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get())); workload->set_table_name(table_name); workload->set_num_replicas(rep_factor_); workload->set_num_write_threads(1); workload->set_write_batch_size(1); workload->set_write_timeout_millis(timeout.ToMilliseconds()); workload->set_already_present_allowed(true); workload->set_remote_error_allowed(true); workload->set_timeout_allowed(true); workload->Setup(); workload->Start(); workloads.emplace_back(std::move(workload)); } #endif const vector<string> tool_args = { "cluster", "rebalance", cluster_->master()->bound_rpc_addr().ToString(), }; while (MonoTime::Now() < start_time + timeout) { // Rebalancer should not report any errors even if it's an election storm // unless a tablet server is reported as unavailable by ksck: the latter // usually happens because GetConsensusState requests are dropped due to // backpressure. string out; string err; const Status s = RunKuduTool(tool_args, &out, &err); if (s.ok()) { ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") << ToolRunInfo(s, out, err); } else { ASSERT_STR_CONTAINS(err, "unacceptable health status UNAVAILABLE") << ToolRunInfo(s, out, err); } } elector_run = false; elector.join(); elector_cleanup.cancel(); #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER) for (auto& workload : workloads) { workload->StopAndJoin(); } #endif // There might be some re-replication started as a result of election storm, // etc. Eventually, the system should heal itself and 'kudu cluster ksck' // should report no issues. ASSERT_EVENTUALLY([&]() { ASSERT_TOOL_OK( "cluster", "ksck", cluster_->master()->bound_rpc_addr().ToString() ) }); // The rebalancer should successfully rebalance the cluster after ksck // reported 'all is well'. { string out; string err; const Status s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") << ToolRunInfo(s, out, err); } NO_FATALS(cluster_->AssertNoCrashes()); } // A test to verify how the rebalancer handles replicas of single-replica // tablets in case of various values of the '--move_single_replicas' flag // and replica management schemes. class RebalancerAndSingleReplicaTablets : public AdminCliTest, public ::testing::WithParamInterface<tuple<string, Kudu1097>> { }; INSTANTIATE_TEST_CASE_P(, RebalancerAndSingleReplicaTablets, ::testing::Combine(::testing::Values("auto", "enabled", "disabled"), ::testing::Values(Kudu1097::Disable, Kudu1097::Enable))); TEST_P(RebalancerAndSingleReplicaTablets, SingleReplicasStayOrMove) { if (!AllowSlowTests()) { LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; return; } constexpr auto kRepFactor = 1; constexpr auto kNumTservers = 2 * (kRepFactor + 1); constexpr auto kNumTables = kNumTservers; constexpr auto kTserverUnresponsiveMs = 3000; const auto& param = GetParam(); const auto& move_single_replica = std::get<0>(param); const auto is_343_scheme = (std::get<1>(param) == Kudu1097::Enable); const string table_name_pattern = "rebalance_test_table_$0"; const vector<string> master_flags = { Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme), Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs), }; const vector<string> tserver_flags = { Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme), }; FLAGS_num_tablet_servers = kNumTservers; FLAGS_num_replicas = kRepFactor; NO_FATALS(BuildAndStart(tserver_flags, master_flags)); // Keep running only (kRepFactor + 1) tablet servers and shut down the rest. for (auto i = kRepFactor + 1; i < kNumTservers; ++i) { cluster_->tablet_server(i)->Shutdown(); } // Wait for the catalog manager to understand that only (kRepFactor + 1) // tablet servers are available. SleepFor(MonoDelta::FromMilliseconds(5 * kTserverUnresponsiveMs / 4)); // Create few tables with their tablet replicas landing only on those // (kRepFactor + 1) running tablet servers. KuduSchema client_schema(client::KuduSchemaFromSchema(schema_)); for (auto i = 0; i < kNumTables; ++i) { const string table_name = Substitute(table_name_pattern, i); unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); ASSERT_OK(table_creator->table_name(table_name) .schema(&client_schema) .add_hash_partitions({ "key" }, 3) .num_replicas(kRepFactor) .Create()); ASSERT_TOOL_OK( "perf", "loadgen", cluster_->master()->bound_rpc_addr().ToString(), Substitute("--table_name=$0", table_name), // Don't need much data in there. "--num_threads=1", "--num_rows_per_thread=1", ); } for (auto i = kRepFactor + 1; i < kNumTservers; ++i) { ASSERT_OK(cluster_->tablet_server(i)->Restart()); } string out; string err; const Status s = RunKuduTool({ "cluster", "rebalance", cluster_->master()->bound_rpc_addr().ToString(), Substitute("--move_single_replicas=$0", move_single_replica), }, &out, &err); ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") << "stderr: " << err; if (move_single_replica == "enabled" || (move_single_replica == "auto" && is_343_scheme)) { // Should move appropriate replicas of single-replica tablets. ASSERT_STR_NOT_CONTAINS(out, "rebalancing is complete: cluster is balanced (moved 0 replicas)") << "stderr: " << err; ASSERT_STR_NOT_CONTAINS(err, "has single replica, skipping"); } else { ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced (moved 0 replicas)") << "stderr: " << err; ASSERT_STR_MATCHES(err, "tablet .* of table '.*' (.*) has single replica, skipping"); } NO_FATALS(cluster_->AssertNoCrashes()); ClusterVerifier v(cluster_.get()); NO_FATALS(v.CheckCluster()); } } // namespace tools } // namespace kudu