common/protobuf/kudu/rpc/rpc_stub-test.cc (523 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include <algorithm> #include <atomic> #include <csignal> #include <cstdio> #include <cstdlib> #include <cstring> #include <functional> #include <limits> #include <memory> #include <ostream> #include <string> #include <thread> #include <vector> #include <gflags/gflags.h> #include <glog/logging.h> #include <glog/stl_logging.h> #include <gtest/gtest.h> #include "kudu/gutil/atomicops.h" #include "kudu/gutil/ref_counted.h" #include "kudu/rpc/messenger.h" #include "kudu/rpc/proxy.h" #include "kudu/rpc/rpc-test-base.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/rpc_header.pb.h" #include "kudu/rpc/rpc_introspection.pb.h" #include "kudu/rpc/rpcz_store.h" #include "kudu/rpc/rtest.pb.h" #include "kudu/rpc/rtest.proxy.h" #include "kudu/rpc/service_pool.h" #include "kudu/rpc/user_credentials.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/env.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/pb_util.h" #include "kudu/util/random.h" #include "kudu/util/status.h" #include "kudu/util/subprocess.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" #include "kudu/util/thread_restrictions.h" #include "kudu/util/user.h" DEFINE_bool(is_panic_test_child, false, "Used by TestRpcPanic"); DECLARE_bool(socket_inject_short_recvs); using kudu::pb_util::SecureDebugString; using std::shared_ptr; using std::string; using std::unique_ptr; using std::vector; using base::subtle::NoBarrier_Load; namespace kudu { namespace rpc { class RpcStubTest : public RpcTestBase { public: void SetUp() override { RpcTestBase::SetUp(); // Use a shorter queue length since some tests below need to start enough // threads to saturate the queue. service_queue_length_ = 10; ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr_)); ASSERT_OK(CreateMessenger("Client", &client_messenger_)); } protected: void SendSimpleCall() { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); RpcController controller; AddRequestPB req; req.set_x(10); req.set_y(20); AddResponsePB resp; ASSERT_OK(p.Add(req, &resp, &controller)); ASSERT_EQ(30, resp.result()); } Sockaddr server_addr_; shared_ptr<Messenger> client_messenger_; }; TEST_F(RpcStubTest, TestSimpleCall) { SendSimpleCall(); } // Regression test for a bug in which we would not properly parse a call // response when recv() returned a 'short read'. This injects such short // reads and then makes a number of calls. TEST_F(RpcStubTest, TestShortRecvs) { FLAGS_socket_inject_short_recvs = true; CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); for (int i = 0; i < 100; i++) { NO_FATALS(SendSimpleCall()); } } // Test calls which are rather large. // This test sends many of them at once using the async API and then // waits for them all to return. This is meant to ensure that the // IO threads can deal with read/write calls that don't succeed // in sending the entire data in one go. TEST_F(RpcStubTest, TestBigCallData) { const int kNumSentAtOnce = 20; const size_t kMessageSize = 5 * 1024 * 1024; string data; data.resize(kMessageSize); CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); EchoRequestPB req; req.set_data(data); vector<unique_ptr<EchoResponsePB>> resps; vector<unique_ptr<RpcController>> controllers; CountDownLatch latch(kNumSentAtOnce); for (int i = 0; i < kNumSentAtOnce; i++) { resps.emplace_back(new EchoResponsePB); controllers.emplace_back(new RpcController); p.EchoAsync(req, resps.back().get(), controllers.back().get(), [&latch]() { latch.CountDown(); }); } latch.Wait(); for (const auto& c : controllers) { ASSERT_OK(c->status()); } } TEST_F(RpcStubTest, TestRespondDeferred) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); RpcController controller; SleepRequestPB req; req.set_sleep_micros(1000); req.set_deferred(true); SleepResponsePB resp; ASSERT_OK(p.Sleep(req, &resp, &controller)); } // Test that the default user credentials are propagated to the server. TEST_F(RpcStubTest, TestDefaultCredentialsPropagated) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); string expected; ASSERT_OK(GetLoggedInUser(&expected)); RpcController controller; WhoAmIRequestPB req; WhoAmIResponsePB resp; ASSERT_OK(p.WhoAmI(req, &resp, &controller)); ASSERT_EQ(expected, resp.credentials().real_user()); ASSERT_FALSE(resp.credentials().has_effective_user()); } // Test that the user can specify other credentials. TEST_F(RpcStubTest, TestCustomCredentialsPropagated) { const char* const kFakeUserName = "some fake user"; CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); UserCredentials creds; creds.set_real_user(kFakeUserName); p.set_user_credentials(creds); RpcController controller; WhoAmIRequestPB req; WhoAmIResponsePB resp; ASSERT_OK(p.WhoAmI(req, &resp, &controller)); ASSERT_EQ(kFakeUserName, resp.credentials().real_user()); ASSERT_FALSE(resp.credentials().has_effective_user()); } TEST_F(RpcStubTest, TestAuthorization) { // First test calling WhoAmI() as user "alice", who is disallowed. { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); UserCredentials creds; creds.set_real_user("alice"); p.set_user_credentials(creds); // Alice is disallowed by all RPCs. { RpcController controller; WhoAmIRequestPB req; WhoAmIResponsePB resp; Status s = p.WhoAmI(req, &resp, &controller); ASSERT_FALSE(s.ok()); ASSERT_EQ(s.ToString(), "Remote error: Not authorized: alice is not allowed to call this method"); } // KUDU-2540: Authorization failures on exactly-once RPCs cause FATAL { RpcController controller; unique_ptr<RequestIdPB> request_id(new RequestIdPB); request_id->set_client_id("client-id"); request_id->set_attempt_no(0); request_id->set_seq_no(0); request_id->set_first_incomplete_seq_no(-1); controller.SetRequestIdPB(std::move(request_id)); ExactlyOnceRequestPB req; req.set_value_to_add(1); ExactlyOnceResponsePB resp; Status s = p.AddExactlyOnce(req, &resp, &controller); ASSERT_FALSE(s.ok()); ASSERT_EQ(s.ToString(), "Remote error: Not authorized: alice is not allowed to call this method"); } } // Try some calls as "bob". { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); UserCredentials creds; creds.set_real_user("bob"); p.set_user_credentials(creds); // "bob" is allowed to call WhoAmI(). { RpcController controller; WhoAmIRequestPB req; WhoAmIResponsePB resp; ASSERT_OK(p.WhoAmI(req, &resp, &controller)); } // "bob" is not allowed to call "Sleep". { RpcController controller; SleepRequestPB req; req.set_sleep_micros(10); SleepResponsePB resp; Status s = p.Sleep(req, &resp, &controller); ASSERT_EQ(s.ToString(), "Remote error: Not authorized: bob is not allowed to call this method"); } } } // Test that the user's remote address is accessible to the server. TEST_F(RpcStubTest, TestRemoteAddress) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); RpcController controller; WhoAmIRequestPB req; WhoAmIResponsePB resp; ASSERT_OK(p.WhoAmI(req, &resp, &controller)); ASSERT_STR_CONTAINS(resp.address(), "127.0.0.1:"); } //////////////////////////////////////////////////////////// // Tests for error cases //////////////////////////////////////////////////////////// // Test sending a PB parameter with a missing field, where the client // thinks it has sent a full PB. (eg due to version mismatch) TEST_F(RpcStubTest, TestCallWithInvalidParam) { Proxy p(client_messenger_, server_addr_, server_addr_.host(), CalculatorService::static_service_name()); rpc_test::AddRequestPartialPB req; req.set_x(rand()); // AddRequestPartialPB is missing the 'y' field. AddResponsePB resp; RpcController controller; Status s = p.SyncRequest("Add", req, &resp, &controller); ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument: invalid parameter for call " "kudu.rpc_test.CalculatorService.Add: " "missing fields: y"); } // Wrapper around AtomicIncrement, since AtomicIncrement returns the 'old' // value, and our callback needs to be a void function. static void DoIncrement(Atomic32* count) { base::subtle::Barrier_AtomicIncrement(count, 1); } // Test sending a PB parameter with a missing field on the client side. // This also ensures that the async callback is only called once // (regression test for a previously-encountered bug). TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); RpcController controller; AddRequestPB req; req.set_x(10); // Request is missing the 'y' field. AddResponsePB resp; Atomic32 callback_count = 0; p.AddAsync(req, &resp, &controller, [&callback_count]() { DoIncrement(&callback_count); }); while (NoBarrier_Load(&callback_count) == 0) { SleepFor(MonoDelta::FromMicroseconds(10)); } SleepFor(MonoDelta::FromMicroseconds(100)); ASSERT_EQ(1, NoBarrier_Load(&callback_count)); ASSERT_STR_CONTAINS(controller.status().ToString(), "Invalid argument: invalid parameter for call " "kudu.rpc_test.CalculatorService.Add: missing fields: y"); } TEST_F(RpcStubTest, TestResponseWithMissingField) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); RpcController rpc; TestInvalidResponseRequestPB req; TestInvalidResponseResponsePB resp; req.set_error_type(rpc_test::TestInvalidResponseRequestPB_ErrorType_MISSING_REQUIRED_FIELD); Status s = p.TestInvalidResponse(req, &resp, &rpc); ASSERT_STR_CONTAINS(s.ToString(), "invalid RPC response, missing fields: response"); } // Test case where the server responds with a message which is larger than the maximum // configured RPC message size. The server should send the response, but the client // will reject it. TEST_F(RpcStubTest, TestResponseLargerThanFrameSize) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); RpcController rpc; TestInvalidResponseRequestPB req; TestInvalidResponseResponsePB resp; req.set_error_type(rpc_test::TestInvalidResponseRequestPB_ErrorType_RESPONSE_TOO_LARGE); Status s = p.TestInvalidResponse(req, &resp, &rpc); ASSERT_STR_CONTAINS(s.ToString(), "Network error: RPC frame had a length of"); } // Test sending a call which isn't implemented by the server. TEST_F(RpcStubTest, TestCallMissingMethod) { Proxy p(client_messenger_, server_addr_, server_addr_.host(), CalculatorService::static_service_name()); Status s = DoTestSyncCall(&p, "DoesNotExist"); ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "with an invalid method name: DoesNotExist"); } TEST_F(RpcStubTest, TestApplicationError) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); RpcController controller; SleepRequestPB req; SleepResponsePB resp; req.set_sleep_micros(1); req.set_return_app_error(true); Status s = p.Sleep(req, &resp, &controller); ASSERT_TRUE(s.IsRemoteError()); EXPECT_EQ("Remote error: Got some error", s.ToString()); EXPECT_EQ("message: \"Got some error\"\n" "[kudu.rpc_test.CalculatorError.app_error_ext] {\n" " extra_error_data: \"some application-specific error data\"\n" "}\n", SecureDebugString(*controller.error_response())); } TEST_F(RpcStubTest, TestRpcPanic) { if (!FLAGS_is_panic_test_child) { // This is a poor man's death test. We call this same // test case, but set the above flag, and verify that // it aborted. gtest death tests don't work here because // there are already threads started up. vector<string> argv; string executable_path; CHECK_OK(env_->GetExecutablePath(&executable_path)); argv.push_back(executable_path); argv.emplace_back("--is_panic_test_child"); argv.emplace_back("--gtest_filter=RpcStubTest.TestRpcPanic"); Subprocess subp(argv); subp.ShareParentStderr(false); CHECK_OK(subp.Start()); FILE* in = fdopen(subp.from_child_stderr_fd(), "r"); PCHECK(in); // Search for string "Test method panicking!" somewhere in stderr char buf[1024]; bool found_string = false; while (fgets(buf, sizeof(buf), in)) { if (strstr(buf, "Test method panicking!")) { found_string = true; break; } } CHECK(found_string); // Check return status int wait_status = 0; CHECK_OK(subp.Wait(&wait_status)); CHECK(!WIFEXITED(wait_status)); // should not have been successful if (WIFSIGNALED(wait_status)) { CHECK_EQ(WTERMSIG(wait_status), SIGABRT); } else { // On some systems, we get exit status 134 from SIGABRT rather than // WIFSIGNALED getting flagged. CHECK_EQ(WEXITSTATUS(wait_status), 134); } return; } else { // Before forcing the panic, explicitly remove the test directory. This // should be safe; this test doesn't generate any data. CHECK_OK(env_->DeleteRecursively(test_dir_)); // Make an RPC which causes the server to abort. CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); RpcController controller; PanicRequestPB req; PanicResponsePB resp; p.Panic(req, &resp, &controller); } } struct AsyncSleep { AsyncSleep() : latch(1) {} RpcController rpc; SleepRequestPB req; SleepResponsePB resp; CountDownLatch latch; }; TEST_F(RpcStubTest, TestDontHandleTimedOutCalls) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); vector<unique_ptr<AsyncSleep>> sleeps; sleeps.reserve(n_worker_threads_); // Send enough sleep calls to occupy the worker threads. for (int i = 0; i < n_worker_threads_; i++) { unique_ptr<AsyncSleep> sleep(new AsyncSleep); sleep->rpc.set_timeout(MonoDelta::FromSeconds(1)); sleep->req.set_sleep_micros(1000*1000); // 1sec auto& l = sleep->latch; p.SleepAsync(sleep->req, &sleep->resp, &sleep->rpc, [&l]() { l.CountDown(); }); sleeps.emplace_back(std::move(sleep)); } // We asynchronously sent the RPCs above, but the RPCs might still // be in the queue. Because the RPC we send next has a lower timeout, // it would take priority over the long-timeout RPCs. So, we have to // wait until the above RPCs are being processed before we continue // the test. const Histogram* queue_time_metric = service_pool_->IncomingQueueTimeMetricForTests(); while (queue_time_metric->TotalCount() < n_worker_threads_) { SleepFor(MonoDelta::FromMilliseconds(1)); } // Send another call with a short timeout. This shouldn't get processed, because // it'll get stuck in the queue for longer than its timeout. ASSERT_EVENTUALLY([&]() { RpcController rpc; SleepRequestPB req; SleepResponsePB resp; req.set_sleep_micros(1); // unused but required. rpc.set_timeout(MonoDelta::FromMilliseconds(5)); Status s = p.Sleep(req, &resp, &rpc); ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); // Since our timeout was short, it's possible in rare circumstances // that we time out the RPC on the outbound queue, in which case // we won't trigger the desired behavior here. In that case, the // timeout error status would have the string 'ON_OUTBOUND_QUEUE' // instead of 'SENT', so this assertion would fail and cause the // ASSERT_EVENTUALLY to loop. ASSERT_STR_CONTAINS(s.ToString(), "SENT"); }); for (const auto& s : sleeps) { s->latch.Wait(); } // Verify that the timedout call got short circuited before being processed. // We may need to loop a short amount of time as we are racing with the reactor // thread to process the remaining elements of the queue. const Counter* timed_out_in_queue = service_pool_->RpcsTimedOutInQueueMetricForTests(); ASSERT_EVENTUALLY([&]{ ASSERT_EQ(1, timed_out_in_queue->value()); }); } // Test which ensures that the RPC queue accepts requests with the earliest // deadline first (EDF), and upon overflow rejects requests with the latest deadlines. // // In particular, this simulates a workload experienced with Impala where the local // impalad would spawn more scanner threads than the total number of handlers plus queue // slots, guaranteeing that some of those clients would see SERVER_TOO_BUSY rejections on // scan requests and be forced to back off and retry. Without EDF scheduling, we saw that // the "unlucky" threads that got rejected would likely continue to get rejected upon // retries, and some would be starved continually until they missed their overall deadline // and failed the query. // // With EDF scheduling, the retries take priority over the original requests (because // they retain their original deadlines). This prevents starvation of unlucky threads. TEST_F(RpcStubTest, TestEarliestDeadlineFirstQueue) { const int num_client_threads = service_queue_length_ + n_worker_threads_ + 5; vector<std::thread> threads; vector<int> successes(num_client_threads); std::atomic<bool> done(false); for (int thread_id = 0; thread_id < num_client_threads; thread_id++) { threads.emplace_back([&, thread_id] { Random rng(thread_id); CalculatorServiceProxy p( client_messenger_, server_addr_, server_addr_.host()); while (!done.load()) { // Set a deadline in the future. We'll keep using this same deadline // on each of our retries. MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(8); for (int attempt = 1; !done.load(); attempt++) { RpcController controller; SleepRequestPB req; SleepResponsePB resp; controller.set_deadline(deadline); req.set_sleep_micros(100000); Status s = p.Sleep(req, &resp, &controller); if (s.ok()) { successes[thread_id]++; break; } // We expect to get SERVER_TOO_BUSY errors because we have more clients than the // server has handlers and queue slots. No other errors are expected. CHECK(s.IsRemoteError() && controller.error_response()->code() == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY) << "Unexpected RPC failure: " << s.ToString(); // Randomized exponential backoff (similar to that done by the scanners in the Kudu // client.). int backoff = (0.5 + rng.NextDoubleFraction() * 0.5) * (std::min(1 << attempt, 1000)); VLOG(1) << "backoff " << backoff << "ms"; SleepFor(MonoDelta::FromMilliseconds(backoff)); } } }); } // Let the threads run for 5 seconds before stopping them. SleepFor(MonoDelta::FromSeconds(5)); done.store(true); for (auto& t : threads) { t.join(); } // Before switching to earliest-deadline-first scheduling, the results // here would typically look something like: // 1 1 0 1 10 17 6 1 12 12 17 10 8 7 12 9 16 15 // With the fix, we see something like: // 9 9 9 8 9 9 9 9 9 9 9 9 9 9 9 9 9 LOG(INFO) << "thread RPC success counts: " << successes; int sum = 0; int min = std::numeric_limits<int>::max(); for (int x : successes) { sum += x; min = std::min(min, x); } int avg = sum / successes.size(); ASSERT_GT(min, avg / 2) << "expected the least lucky thread to have at least half as many successes " << "as the average thread: min=" << min << " avg=" << avg; } TEST_F(RpcStubTest, TestDumpCallsInFlight) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); AsyncSleep sleep; sleep.req.set_sleep_micros(100 * 1000); // 100ms auto& l = sleep.latch; p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc, [&l]() { l.CountDown(); }); // Check the running RPC status on the client messenger. DumpConnectionsRequestPB dump_req; DumpConnectionsResponsePB dump_resp; dump_req.set_include_traces(true); ASSERT_OK(client_messenger_->DumpConnections(dump_req, &dump_resp)); LOG(INFO) << "client messenger: " << SecureDebugString(dump_resp); ASSERT_EQ(1, dump_resp.outbound_connections_size()); ASSERT_EQ(1, dump_resp.outbound_connections(0).calls_in_flight_size()); ASSERT_EQ("Sleep", dump_resp.outbound_connections(0).calls_in_flight(0). header().remote_method().method_name()); ASSERT_GT(dump_resp.outbound_connections(0).calls_in_flight(0).micros_elapsed(), 0); // And the server messenger. // We have to loop this until we find a result since the actual call is sent // asynchronously off of the main thread (ie the server may not be handling it yet) for (int i = 0; i < 100; i++) { dump_resp.Clear(); ASSERT_OK(server_messenger_->DumpConnections(dump_req, &dump_resp)); if (dump_resp.inbound_connections_size() > 0 && dump_resp.inbound_connections(0).calls_in_flight_size() > 0) { break; } SleepFor(MonoDelta::FromMilliseconds(1)); } LOG(INFO) << "server messenger: " << SecureDebugString(dump_resp); ASSERT_EQ(1, dump_resp.inbound_connections_size()); ASSERT_EQ(1, dump_resp.inbound_connections(0).calls_in_flight_size()); ASSERT_EQ("Sleep", dump_resp.inbound_connections(0).calls_in_flight(0). header().remote_method().method_name()); ASSERT_GT(dump_resp.inbound_connections(0).calls_in_flight(0).micros_elapsed(), 0); ASSERT_STR_CONTAINS(dump_resp.inbound_connections(0).calls_in_flight(0).trace_buffer(), "Inserting onto call queue"); sleep.latch.Wait(); } TEST_F(RpcStubTest, TestDumpSampledCalls) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); // Issue two calls that fall into different latency buckets. AsyncSleep sleeps[2]; sleeps[0].req.set_sleep_micros(150 * 1000); // 150ms sleeps[1].req.set_sleep_micros(1500 * 1000); // 1500ms for (auto& sleep : sleeps) { auto& l = sleep.latch; p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc, [&l]() { l.CountDown(); }); } for (auto& sleep : sleeps) { sleep.latch.Wait(); } // Dump the sampled RPCs and expect to see the calls // above. DumpRpczStoreResponsePB sampled_rpcs; server_messenger_->rpcz_store()->DumpPB(DumpRpczStoreRequestPB(), &sampled_rpcs); EXPECT_EQ(sampled_rpcs.methods_size(), 1); ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), " metrics {\n" " key: \"test_sleep_us\"\n" " value: 150000\n" " }\n"); ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), " metrics {\n" " key: \"test_sleep_us\"\n" " value: 1500000\n" " }\n"); ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), " metrics {\n" " child_path: \"test_child\"\n" " key: \"related_trace_metric\"\n" " value: 1\n" " }"); ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), "SleepRequestPB"); ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), "duration_ms"); } namespace { struct RefCountedTest : public RefCountedThreadSafe<RefCountedTest> { }; // Test callback which takes a refcounted pointer. // We don't use this parameter, but it's used to validate that the bound callback // is cleared in TestCallbackClearedAfterRunning. void MyTestCallback(CountDownLatch* latch, scoped_refptr<RefCountedTest> my_refptr) { latch->CountDown(); } } // anonymous namespace // Verify that, after a call has returned, no copy of the call's callback // is held. This is important when the callback holds a refcounted ptr, // since we expect to be able to release that pointer when the call is done. TEST_F(RpcStubTest, TestCallbackClearedAfterRunning) { CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); CountDownLatch latch(1); scoped_refptr<RefCountedTest> my_refptr(new RefCountedTest); RpcController controller; AddRequestPB req; req.set_x(10); req.set_y(20); AddResponsePB resp; p.AddAsync(req, &resp, &controller, [&latch, my_refptr]() { MyTestCallback(&latch, my_refptr); }); latch.Wait(); // The ref count should go back down to 1. However, we need to loop a little // bit, since the deref is happening on another thread. If the other thread gets // descheduled directly after calling our callback, we'd fail without these sleeps. for (int i = 0; i < 100 && !my_refptr->HasOneRef(); i++) { SleepFor(MonoDelta::FromMilliseconds(1)); } ASSERT_TRUE(my_refptr->HasOneRef()); } // Regression test for KUDU-1409: if the client reactor thread is blocked (e.g due to a // process-wide pause or a slow callback) then we should not cause RPC calls to time out. TEST_F(RpcStubTest, DontTimeOutWhenReactorIsBlocked) { CHECK_EQ(client_messenger_->num_reactors(), 1) << "This test requires only a single reactor. Otherwise the injected sleep might " << "be scheduled on a different reactor than the RPC call."; CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host()); // Schedule a 1-second sleep on the reactor thread. // // This will cause us the reactor to be blocked while the call response is received, and // still be blocked when the timeout would normally occur. Despite this, the call should // not time out. // // 0s 0.5s 1.2s 1.5s // RPC call running // |---------------------| // Reactor blocked in sleep // |----------------------| // \_ RPC would normally time out client_messenger_->ScheduleOnReactor([](const Status& s) { ThreadRestrictions::ScopedAllowWait allow_wait; SleepFor(MonoDelta::FromSeconds(1)); }, MonoDelta::FromSeconds(0.5)); RpcController controller; SleepRequestPB req; SleepResponsePB resp; req.set_sleep_micros(800 * 1000); controller.set_timeout(MonoDelta::FromMilliseconds(1200)); ASSERT_OK(p.Sleep(req, &resp, &controller)); } } // namespace rpc } // namespace kudu