common/protobuf/kudu/rpc/connection.cc (691 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "kudu/rpc/connection.h" #include <netinet/in.h> #include <netinet/tcp.h> #include <string.h> #include <algorithm> #include <cerrno> #include <iostream> #include <memory> #include <set> #include <string> #include <boost/intrusive/detail/list_iterator.hpp> #include <boost/intrusive/list.hpp> #include <ev.h> #include <glog/logging.h> #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/human_readable.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/rpc/inbound_call.h" #include "kudu/rpc/messenger.h" #include "kudu/rpc/outbound_call.h" #include "kudu/rpc/reactor.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/rpc_header.pb.h" #include "kudu/rpc/rpc_introspection.pb.h" #include "kudu/rpc/transfer.h" #include "kudu/security/tls_socket.h" #include "kudu/util/errno.h" #include "kudu/util/faststring.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/net/socket.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" #include <sys/socket.h> #ifdef __linux__ #include <sys/ioctl.h> #endif using kudu::security::TlsSocket; using std::includes; using std::set; using std::shared_ptr; using std::unique_ptr; using strings::Substitute; namespace kudu { namespace rpc { typedef OutboundCall::Phase Phase; namespace { // tcp_info struct duplicated from linux/tcp.h. // // This allows us to decouple the compile-time Linux headers from the // runtime Linux kernel. The compile-time headers (and kernel) might be // older than the runtime kernel, in which case an ifdef-based approach // wouldn't allow us to get all of the info available. // // NOTE: this struct has been annotated with some local notes about the // contents of each field. struct tcp_info { // Various state-tracking information. // ------------------------------------------------------------ uint8_t tcpi_state; uint8_t tcpi_ca_state; uint8_t tcpi_retransmits; uint8_t tcpi_probes; uint8_t tcpi_backoff; uint8_t tcpi_options; uint8_t tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4; uint8_t tcpi_delivery_rate_app_limited:1; // Configurations. // ------------------------------------------------------------ uint32_t tcpi_rto; uint32_t tcpi_ato; uint32_t tcpi_snd_mss; uint32_t tcpi_rcv_mss; // Counts of packets in various states in the outbound queue. // At first glance one might think these are monotonic counters, but // in fact they are instantaneous counts of queued packets and thus // not very useful for our purposes. // ------------------------------------------------------------ // Number of packets outstanding that haven't been acked. uint32_t tcpi_unacked; // Number of packets outstanding that have been selective-acked. uint32_t tcpi_sacked; // Number of packets outstanding that have been deemed lost (a SACK arrived // for a later packet) uint32_t tcpi_lost; // Number of packets in the queue that have been retransmitted. uint32_t tcpi_retrans; // The number of packets towards the highest SACKed sequence number // (some measure of reording, removed in later Linux versions by // 737ff314563ca27f044f9a3a041e9d42491ef7ce) uint32_t tcpi_fackets; // Times when various events occurred. // ------------------------------------------------------------ uint32_t tcpi_last_data_sent; uint32_t tcpi_last_ack_sent; /* Not remembered, sorry. */ uint32_t tcpi_last_data_recv; uint32_t tcpi_last_ack_recv; // Path MTU. uint32_t tcpi_pmtu; // Receiver slow start threshold. uint32_t tcpi_rcv_ssthresh; // Smoothed RTT estimate and variance based on the time between sending data and receiving // corresponding ACK. See https://tools.ietf.org/html/rfc2988 for details. uint32_t tcpi_rtt; uint32_t tcpi_rttvar; // Slow start threshold. uint32_t tcpi_snd_ssthresh; // Sender congestion window (in number of MSS-sized packets) uint32_t tcpi_snd_cwnd; // Advertised MSS. uint32_t tcpi_advmss; // Amount of packet reordering allowed. uint32_t tcpi_reordering; // Receiver-side RTT estimate per the Dynamic Right Sizing algorithm: // // "A system that is only transmitting acknowledgements can still estimate the round-trip // time by observing the time between when a byte is first acknowledged and the receipt of // data that is at least one window beyond the sequence number that was acknowledged. If the // sender is being throttled by the network, this estimate will be valid. However, if the // sending application did not have any data to send, the measured time could be much larger // than the actual round-trip time. Thus this measurement acts only as an upper-bound on the // round-trip time and should be be used only when it is the only source of round-trip time // information." uint32_t tcpi_rcv_rtt; uint32_t tcpi_rcv_space; // Total number of retransmitted packets. uint32_t tcpi_total_retrans; // Pacing-related metrics. uint64_t tcpi_pacing_rate; uint64_t tcpi_max_pacing_rate; // Total bytes ACKed by remote peer. uint64_t tcpi_bytes_acked; /* RFC4898 tcpEStatsAppHCThruOctetsAcked */ // Total bytes received (for which ACKs have been sent out). uint64_t tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */ // Segments sent and received. uint32_t tcpi_segs_out; /* RFC4898 tcpEStatsPerfSegsOut */ uint32_t tcpi_segs_in; /* RFC4898 tcpEStatsPerfSegsIn */ // The following metrics are quite new and not in el7. // ------------------------------------------------------------ uint32_t tcpi_notsent_bytes; uint32_t tcpi_min_rtt; uint32_t tcpi_data_segs_in; /* RFC4898 tcpEStatsDataSegsIn */ uint32_t tcpi_data_segs_out; /* RFC4898 tcpEStatsDataSegsOut */ // Calculated rate at which data was delivered. uint64_t tcpi_delivery_rate; // Timers for various states. uint64_t tcpi_busy_time; /* Time (usec) busy sending data */ uint64_t tcpi_rwnd_limited; /* Time (usec) limited by receive window */ uint64_t tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */ }; } // anonymous namespace /// /// Connection /// Connection::Connection(ReactorThread *reactor_thread, Sockaddr remote, unique_ptr<Socket> socket, Direction direction, CredentialsPolicy policy) : reactor_thread_(reactor_thread), remote_(remote), socket_(std::move(socket)), direction_(direction), last_activity_time_(MonoTime::Now()), is_epoll_registered_(false), call_id_(std::numeric_limits<int32_t>::max()), credentials_policy_(policy), negotiation_complete_(false), is_confidential_(false), scheduled_for_shutdown_(false) { } Status Connection::SetNonBlocking(bool enabled) { return socket_->SetNonBlocking(enabled); } Status Connection::SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries) { DCHECK_GT(idle_time_s, 0); DCHECK_GE(retry_time_s, 0); DCHECK_GE(num_retries, 0); return socket_->SetTcpKeepAlive(std::max(1, idle_time_s), std::max(0, retry_time_s), std::max(0, num_retries)); } void Connection::EpollRegister(ev::loop_ref& loop) { DCHECK(reactor_thread_->IsCurrentThread()); DVLOG(4) << "Registering connection for epoll: " << ToString(); write_io_.set(loop); write_io_.set(socket_->GetFd(), ev::WRITE); write_io_.set<Connection, &Connection::WriteHandler>(this); if (direction_ == CLIENT && negotiation_complete_) { write_io_.start(); } read_io_.set(loop); read_io_.set(socket_->GetFd(), ev::READ); read_io_.set<Connection, &Connection::ReadHandler>(this); read_io_.start(); is_epoll_registered_ = true; } Connection::~Connection() { // Must clear the outbound_transfers_ list before deleting. CHECK(outbound_transfers_.begin() == outbound_transfers_.end()); // It's crucial that the connection is Shutdown first -- otherwise // our destructor will end up calling read_io_.stop() and write_io_.stop() // from a possibly non-reactor thread context. This can then make all // hell break loose with libev. CHECK(!is_epoll_registered_); } bool Connection::Idle() const { DCHECK(reactor_thread_->IsCurrentThread()); // check if we're in the middle of receiving something InboundTransfer *transfer = inbound_.get(); if (transfer && (transfer->TransferStarted())) { return false; } // check if we still need to send something if (!outbound_transfers_.empty()) { return false; } // can't kill a connection if calls are waiting response if (!awaiting_response_.empty()) { return false; } if (!calls_being_handled_.empty()) { return false; } // We are not idle if we are in the middle of connection negotiation. if (!negotiation_complete_) { return false; } return true; } void Connection::Shutdown(const Status &status, unique_ptr<ErrorStatusPB> rpc_error) { DCHECK(reactor_thread_->IsCurrentThread()); shutdown_status_ = status.CloneAndPrepend("RPC connection failed"); if (inbound_ && inbound_->TransferStarted()) { double secs_since_active = (reactor_thread_->cur_time() - last_activity_time_).ToSeconds(); LOG(WARNING) << "Shutting down " << ToString() << " with pending inbound data (" << inbound_->StatusAsString() << ", last active " << HumanReadableElapsedTime::ToShortString(secs_since_active) << " ago, status=" << status.ToString() << ")"; } // Clear any calls which have been sent and were awaiting a response. for (const car_map_t::value_type &v : awaiting_response_) { CallAwaitingResponse *c = v.second; if (c->call) { // Make sure every awaiting call receives the error info, if any. unique_ptr<ErrorStatusPB> error; if (rpc_error) { error.reset(new ErrorStatusPB(*rpc_error)); } c->call->SetFailed(status, negotiation_complete_ ? Phase::REMOTE_CALL : Phase::CONNECTION_NEGOTIATION, std::move(error)); } // And we must return the CallAwaitingResponse to the pool car_pool_.Destroy(c); } awaiting_response_.clear(); // Clear any outbound transfers. while (!outbound_transfers_.empty()) { OutboundTransfer *t = &outbound_transfers_.front(); outbound_transfers_.pop_front(); delete t; } read_io_.stop(); write_io_.stop(); is_epoll_registered_ = false; if (socket_) { WARN_NOT_OK(socket_->Close(), "Error closing socket"); } } void Connection::QueueOutbound(unique_ptr<OutboundTransfer> transfer) { DCHECK(reactor_thread_->IsCurrentThread()); if (!shutdown_status_.ok()) { // If we've already shut down, then we just need to abort the // transfer rather than bothering to queue it. transfer->Abort(shutdown_status_); return; } DVLOG(3) << "Queueing transfer: " << transfer->HexDump(); outbound_transfers_.push_back(*transfer.release()); if (negotiation_complete_ && !write_io_.is_active()) { // Optimistically assume that the socket is writable if we didn't already // have something queued. if (ProcessOutboundTransfers() == kMoreToSend) { write_io_.start(); } } } Connection::CallAwaitingResponse::~CallAwaitingResponse() { DCHECK(conn->reactor_thread_->IsCurrentThread()); } void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int revents) { if (remaining_timeout > 0) { if (watcher.remaining() < -1.0) { LOG(WARNING) << "RPC call timeout handler was delayed by " << -watcher.remaining() << "s! This may be due to a process-wide " << "pause such as swapping, logging-related delays, or allocator lock " << "contention. Will allow an additional " << remaining_timeout << "s for a response."; } watcher.set(remaining_timeout, 0); watcher.start(); remaining_timeout = 0; return; } conn->HandleOutboundCallTimeout(this); } void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) { DCHECK(reactor_thread_->IsCurrentThread()); if (!car->call) { // The RPC may have been cancelled before the timeout was hit. return; } // The timeout timer is stopped by the car destructor exiting Connection::HandleCallResponse() DCHECK(!car->call->IsFinished()); // Mark the call object as failed. car->call->SetTimedOut(negotiation_complete_ ? Phase::REMOTE_CALL : Phase::CONNECTION_NEGOTIATION); // Test cancellation when 'car->call' is in 'TIMED_OUT' state MaybeInjectCancellation(car->call); // Drop the reference to the call. If the original caller has moved on after // seeing the timeout, we no longer need to hold onto the allocated memory // from the request. car->call.reset(); // We still leave the CallAwaitingResponse in the map -- this is because we may still // receive a response from the server, and we don't want a spurious log message // when we do finally receive the response. The fact that CallAwaitingResponse::call // is a NULL pointer indicates to the response processing code that the call // already timed out. } void Connection::CancelOutboundCall(const shared_ptr<OutboundCall> &call) { CallAwaitingResponse* car = FindPtrOrNull(awaiting_response_, call->call_id()); if (car != nullptr) { // car->call may be NULL if the call has timed out already. DCHECK(!car->call || car->call.get() == call.get()); car->call.reset(); } } // Inject a cancellation when 'call' is in state 'FLAGS_rpc_inject_cancellation_state'. void inline Connection::MaybeInjectCancellation(const shared_ptr<OutboundCall> &call) { if (PREDICT_FALSE(call->ShouldInjectCancellation())) { reactor_thread_->reactor()->messenger()->QueueCancellation(call); } } // Callbacks after sending a call on the wire. // This notifies the OutboundCall object to change its state to SENT once it // has been fully transmitted. struct CallTransferCallbacks : public TransferCallbacks { public: explicit CallTransferCallbacks(shared_ptr<OutboundCall> call, Connection *conn) : call_(std::move(call)), conn_(conn) {} virtual void NotifyTransferFinished() OVERRIDE { // TODO: would be better to cancel the transfer while it is still on the queue if we // timed out before the transfer started, but there is still a race in the case of // a partial send that we have to handle here if (call_->IsFinished()) { DCHECK(call_->IsTimedOut() || call_->IsCancelled()); } else { call_->SetSent(); // Test cancellation when 'call_' is in 'SENT' state. conn_->MaybeInjectCancellation(call_); } delete this; } virtual void NotifyTransferAborted(const Status &status) OVERRIDE { VLOG(1) << "Transfer of RPC call " << call_->ToString() << " aborted: " << status.ToString(); delete this; } private: shared_ptr<OutboundCall> call_; Connection* conn_; }; void Connection::QueueOutboundCall(shared_ptr<OutboundCall> call) { DCHECK(call); DCHECK_EQ(direction_, CLIENT); DCHECK(reactor_thread_->IsCurrentThread()); if (PREDICT_FALSE(!shutdown_status_.ok())) { // Already shutdown call->SetFailed(shutdown_status_, negotiation_complete_ ? Phase::REMOTE_CALL : Phase::CONNECTION_NEGOTIATION); return; } // At this point the call has a serialized request, but no call header, since we haven't // yet assigned a call ID. DCHECK(!call->call_id_assigned()); // We shouldn't reach this point if 'call' was requested to be cancelled. DCHECK(!call->cancellation_requested()); // Assign the call ID. int32_t call_id = GetNextCallId(); call->set_call_id(call_id); // Serialize the actual bytes to be put on the wire. TransferPayload tmp_slices; call->SerializeTo(&tmp_slices); call->SetQueued(); // Test cancellation when 'call_' is in 'ON_OUTBOUND_QUEUE' state. MaybeInjectCancellation(call); scoped_car car(car_pool_.make_scoped_ptr(car_pool_.Construct())); car->conn = this; car->call = call; // Set up the timeout timer. const MonoDelta &timeout = call->controller()->timeout(); if (timeout.Initialized()) { reactor_thread_->RegisterTimeout(&car->timeout_timer); car->timeout_timer.set<CallAwaitingResponse, // NOLINT(*) &CallAwaitingResponse::HandleTimeout>(car.get()); // For calls with a timeout of at least 500ms, we actually run the timeout // handler in two stages. The first timeout fires with a timeout 10% less // than the user-specified one. It then schedules a second timeout for the // remaining amount of time. // // The purpose of this two-stage timeout is to be more robust when the client // has some process-wide pause, such as lock contention in tcmalloc, or a // reactor callback that blocks in glog. Consider the following case: // // T = 0s user issues an RPC with 5 second timeout // T = 0.5s - 6s process is blocked // T = 6s process unblocks, and the timeout fires (1s late) // // Without the two-stage timeout, we would determine that the call had timed out, // even though it's likely that the response is waiting on our TCP socket. // With the two-stage timeout, we'll end up with: // // T = 0s user issues an RPC with 5 second timeout // T = 0.5s - 6s process is blocked // T = 6s process unblocks, and the first-stage timeout fires (1.5s late) // T = 6s - 6.200s time for the client to read the response which is waiting // T = 6.200s if the response was not actually available, we'll time out here // // We don't bother with this logic for calls with very short timeouts - assumedly // a user setting such a short RPC timeout is well equipped to handle one. double time = timeout.ToSeconds(); if (time >= 0.5) { car->remaining_timeout = time * 0.1; time -= car->remaining_timeout; } else { car->remaining_timeout = 0; } car->timeout_timer.set(time, 0); car->timeout_timer.start(); } TransferCallbacks *cb = new CallTransferCallbacks(std::move(call), this); awaiting_response_[call_id] = car.release(); QueueOutbound(unique_ptr<OutboundTransfer>( OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, cb))); } // Callbacks for sending an RPC call response from the server. // This takes ownership of the InboundCall object so that, once it has // been responded to, we can free up all of the associated memory. struct ResponseTransferCallbacks : public TransferCallbacks { public: ResponseTransferCallbacks(unique_ptr<InboundCall> call, Connection *conn) : call_(std::move(call)), conn_(conn) {} ~ResponseTransferCallbacks() { // Remove the call from the map. InboundCall *call_from_map = EraseKeyReturnValuePtr( &conn_->calls_being_handled_, call_->call_id()); DCHECK_EQ(call_from_map, call_.get()); } virtual void NotifyTransferFinished() OVERRIDE { delete this; } virtual void NotifyTransferAborted(const Status &status) OVERRIDE { LOG(WARNING) << "Connection torn down before " << call_->ToString() << " could send its response"; delete this; } private: unique_ptr<InboundCall> call_; Connection *conn_; }; // Reactor task which puts a transfer on the outbound transfer queue. class QueueTransferTask : public ReactorTask { public: QueueTransferTask(unique_ptr<OutboundTransfer> transfer, Connection *conn) : transfer_(std::move(transfer)), conn_(conn) {} virtual void Run(ReactorThread *thr) OVERRIDE { conn_->QueueOutbound(std::move(transfer_)); delete this; } virtual void Abort(const Status &status) OVERRIDE { transfer_->Abort(status); delete this; } private: unique_ptr<OutboundTransfer> transfer_; Connection *conn_; }; void Connection::QueueResponseForCall(unique_ptr<InboundCall> call) { // This is usually called by the IPC worker thread when the response // is set, but in some circumstances may also be called by the // reactor thread (e.g. if the service has shut down) DCHECK_EQ(direction_, SERVER); // If the connection is torn down, then the QueueOutbound() call that // eventually runs in the reactor thread will take care of calling // ResponseTransferCallbacks::NotifyTransferAborted. TransferPayload tmp_slices; call->SerializeResponseTo(&tmp_slices); TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this); // After the response is sent, can delete the InboundCall object. // We set a dummy call ID and required feature set, since these are not needed // when sending responses. unique_ptr<OutboundTransfer> t( OutboundTransfer::CreateForCallResponse(tmp_slices, cb)); QueueTransferTask *task = new QueueTransferTask(std::move(t), this); reactor_thread_->reactor()->ScheduleReactorTask(task); } void Connection::set_confidential(bool is_confidential) { is_confidential_ = is_confidential; } bool Connection::SatisfiesCredentialsPolicy(CredentialsPolicy policy) const { DCHECK_EQ(direction_, CLIENT); return (policy == CredentialsPolicy::ANY_CREDENTIALS) || (policy == credentials_policy_); } RpczStore* Connection::rpcz_store() { return reactor_thread_->reactor()->messenger()->rpcz_store(); } void Connection::ReadHandler(ev::io &watcher, int revents) { DCHECK(reactor_thread_->IsCurrentThread()); DVLOG(3) << ToString() << " ReadHandler(revents=" << revents << ")"; if (revents & EV_ERROR) { reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() + ": ReadHandler encountered an error")); return; } last_activity_time_ = reactor_thread_->cur_time(); faststring extra_buf; while (true) { if (!inbound_) { inbound_.reset(new InboundTransfer()); } Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf); if (PREDICT_FALSE(!status.ok())) { if (status.posix_code() == ESHUTDOWN) { VLOG(1) << ToString() << " shut down by remote end."; } else { LOG(WARNING) << ToString() << " recv error: " << status.ToString(); } reactor_thread_->DestroyConnection(this, status); return; } if (!inbound_->TransferFinished()) { DVLOG(3) << ToString() << ": read is not yet finished yet."; return; } DVLOG(3) << ToString() << ": finished reading " << inbound_->data().size() << " bytes"; if (direction_ == CLIENT) { HandleCallResponse(std::move(inbound_)); } else if (direction_ == SERVER) { HandleIncomingCall(std::move(inbound_)); } else { LOG(FATAL) << "Invalid direction: " << direction_; } if (extra_buf.size() > 0) { inbound_.reset(new InboundTransfer(std::move(extra_buf))); } else { break; } } } void Connection::HandleIncomingCall(unique_ptr<InboundTransfer> transfer) { DCHECK(reactor_thread_->IsCurrentThread()); unique_ptr<InboundCall> call(new InboundCall(this)); Status s = call->ParseFrom(std::move(transfer)); if (!s.ok()) { LOG(WARNING) << ToString() << ": received bad data: " << s.ToString(); // TODO: shutdown? probably, since any future stuff on this socket will be // "unsynchronized" return; } if (!InsertIfNotPresent(&calls_being_handled_, call->call_id(), call.get())) { LOG(WARNING) << ToString() << ": received call ID " << call->call_id() << " but was already processing this ID! Ignoring"; reactor_thread_->DestroyConnection( this, Status::RuntimeError("Received duplicate call id", Substitute("$0", call->call_id()))); return; } reactor_thread_->reactor()->messenger()->QueueInboundCall(std::move(call)); } void Connection::HandleCallResponse(unique_ptr<InboundTransfer> transfer) { DCHECK(reactor_thread_->IsCurrentThread()); unique_ptr<CallResponse> resp(new CallResponse); CHECK_OK(resp->ParseFrom(std::move(transfer))); CallAwaitingResponse *car_ptr = EraseKeyReturnValuePtr(&awaiting_response_, resp->call_id()); if (PREDICT_FALSE(car_ptr == nullptr)) { LOG(WARNING) << ToString() << ": Got a response for call id " << resp->call_id() << " which " << "was not pending! Ignoring."; return; } // The car->timeout_timer ev::timer will be stopped automatically by its destructor. scoped_car car(car_pool_.make_scoped_ptr(car_ptr)); if (PREDICT_FALSE(!car->call)) { // The call already failed due to a timeout. VLOG(1) << "Got response to call id " << resp->call_id() << " after client " << "already timed out or cancelled"; return; } car->call->SetResponse(std::move(resp)); // Test cancellation when 'car->call' is in 'FINISHED_SUCCESS' or 'FINISHED_ERROR' state. MaybeInjectCancellation(car->call); } void Connection::WriteHandler(ev::io &watcher, int revents) { DCHECK(reactor_thread_->IsCurrentThread()); if (revents & EV_ERROR) { reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() + ": writeHandler encountered an error")); return; } DVLOG(3) << ToString() << ": writeHandler: revents = " << revents; if (outbound_transfers_.empty()) { LOG(WARNING) << ToString() << " got a ready-to-write callback, but there is " "nothing to write."; write_io_.stop(); return; } if (ProcessOutboundTransfers() == kNoMoreToSend) { write_io_.stop(); } } Connection::ProcessOutboundTransfersResult Connection::ProcessOutboundTransfers() { while (!outbound_transfers_.empty()) { OutboundTransfer* transfer = &(outbound_transfers_.front()); if (!transfer->TransferStarted()) { if (transfer->is_for_outbound_call()) { CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id()); if (!car->call) { // If the call has already timed out or has already been cancelled, the 'call' // field would be set to NULL. In that case, don't bother sending it. outbound_transfers_.pop_front(); transfer->Abort(Status::Aborted("already timed out or cancelled")); delete transfer; continue; } // If this is the start of the transfer, then check if the server has the // required RPC flags. We have to wait until just before the transfer in // order to ensure that the negotiation has taken place, so that the flags // are available. const set<RpcFeatureFlag>& required_features = car->call->required_rpc_features(); if (!includes(remote_features_.begin(), remote_features_.end(), required_features.begin(), required_features.end())) { outbound_transfers_.pop_front(); Status s = Status::NotSupported("server does not support the required RPC features"); transfer->Abort(s); Phase phase = negotiation_complete_ ? Phase::REMOTE_CALL : Phase::CONNECTION_NEGOTIATION; car->call->SetFailed(std::move(s), phase); // Test cancellation when 'call_' is in 'FINISHED_ERROR' state. MaybeInjectCancellation(car->call); car->call.reset(); delete transfer; continue; } car->call->SetSending(); // Test cancellation when 'call_' is in 'SENDING' state. MaybeInjectCancellation(car->call); } } last_activity_time_ = reactor_thread_->cur_time(); Status status = transfer->SendBuffer(socket_.get()); if (PREDICT_FALSE(!status.ok())) { LOG(WARNING) << ToString() << " send error: " << status.ToString(); reactor_thread_->DestroyConnection(this, status); return kConnectionDestroyed; } if (!transfer->TransferFinished()) { DVLOG(3) << ToString() << ": writeHandler: xfer not finished."; return kMoreToSend; } outbound_transfers_.pop_front(); delete transfer; } return kNoMoreToSend; } std::string Connection::ToString() const { // This may be called from other threads, so we cannot // include anything in the output about the current state, // which might concurrently change from another thread. return strings::Substitute( "$0 $1", direction_ == SERVER ? "server connection from" : "client connection to", remote_.ToString()); } // Reactor task that transitions this Connection from connection negotiation to // regular RPC handling. Destroys Connection on negotiation error. class NegotiationCompletedTask : public ReactorTask { public: NegotiationCompletedTask(Connection* conn, Status negotiation_status, std::unique_ptr<ErrorStatusPB> rpc_error) : conn_(conn), negotiation_status_(std::move(negotiation_status)), rpc_error_(std::move(rpc_error)) { } virtual void Run(ReactorThread *rthread) OVERRIDE { rthread->CompleteConnectionNegotiation(conn_, negotiation_status_, std::move(rpc_error_)); delete this; } virtual void Abort(const Status &status) OVERRIDE { DCHECK(conn_->reactor_thread()->reactor()->closing()); VLOG(1) << "Failed connection negotiation due to shut down reactor thread: " << status.ToString(); delete this; } private: scoped_refptr<Connection> conn_; const Status negotiation_status_; std::unique_ptr<ErrorStatusPB> rpc_error_; }; void Connection::CompleteNegotiation(Status negotiation_status, unique_ptr<ErrorStatusPB> rpc_error) { auto task = new NegotiationCompletedTask( this, std::move(negotiation_status), std::move(rpc_error)); reactor_thread_->reactor()->ScheduleReactorTask(task); } void Connection::MarkNegotiationComplete() { DCHECK(reactor_thread_->IsCurrentThread()); negotiation_complete_ = true; } Status Connection::DumpPB(const DumpConnectionsRequestPB& req, RpcConnectionPB* resp) { DCHECK(reactor_thread_->IsCurrentThread()); resp->set_remote_ip(remote_.ToString()); if (negotiation_complete_) { resp->set_state(RpcConnectionPB::OPEN); } else { resp->set_state(RpcConnectionPB::NEGOTIATING); } if (direction_ == CLIENT) { for (const car_map_t::value_type& entry : awaiting_response_) { CallAwaitingResponse* c = entry.second; if (c->call) { c->call->DumpPB(req, resp->add_calls_in_flight()); } } resp->set_outbound_queue_size(num_queued_outbound_transfers()); } else if (direction_ == SERVER) { if (negotiation_complete_) { // It's racy to dump credentials while negotiating, since the Connection // object is owned by the negotiation thread at that point. resp->set_remote_user_credentials(remote_user_.ToString()); } for (const inbound_call_map_t::value_type& entry : calls_being_handled_) { InboundCall* c = entry.second; c->DumpPB(req, resp->add_calls_in_flight()); } } else { LOG(FATAL); } #ifdef __linux__ if (negotiation_complete_ && remote_.is_ip()) { // TODO(todd): it's a little strange to not set socket level stats during // negotiation, but we don't have access to the socket here until negotiation // is complete. WARN_NOT_OK(GetSocketStatsPB(resp->mutable_socket_stats()), "could not fill in TCP info for RPC connection"); } #endif // __linux__ if (negotiation_complete_ && remote_.is_ip()) { WARN_NOT_OK(GetTransportDetailsPB(resp->mutable_transport_details()), "could not fill in transport info for RPC connection"); } return Status::OK(); } #ifdef __linux__ Status Connection::GetSocketStatsPB(SocketStatsPB* pb) const { DCHECK(reactor_thread_->IsCurrentThread()); int fd = socket_->GetFd(); CHECK_GE(fd, 0); // Fetch TCP_INFO statistics from the kernel. tcp_info ti; memset(&ti, 0, sizeof(ti)); socklen_t len = sizeof(ti); int rc = getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len); if (rc == 0) { # define HAS_FIELD(field_name) \ (len >= offsetof(tcp_info, field_name) + sizeof(ti.field_name)) if (!HAS_FIELD(tcpi_total_retrans)) { // All the fields up through tcpi_total_retrans were present since very old // kernel versions, beyond our minimal supported. So, we can just bail if we // don't get sufficient data back. return Status::NotSupported("bad length returned for TCP_INFO"); } pb->set_rtt(ti.tcpi_rtt); pb->set_rttvar(ti.tcpi_rttvar); pb->set_snd_cwnd(ti.tcpi_snd_cwnd); pb->set_total_retrans(ti.tcpi_total_retrans); // The following fields were added later in kernel development history. // In RHEL6 they were backported starting in 6.8. Even though they were // backported all together as a group, we'll just be safe and check for // each individually. if (HAS_FIELD(tcpi_pacing_rate)) { pb->set_pacing_rate(ti.tcpi_pacing_rate); } if (HAS_FIELD(tcpi_max_pacing_rate)) { pb->set_max_pacing_rate(ti.tcpi_max_pacing_rate); } if (HAS_FIELD(tcpi_bytes_acked)) { pb->set_bytes_acked(ti.tcpi_bytes_acked); } if (HAS_FIELD(tcpi_bytes_received)) { pb->set_bytes_received(ti.tcpi_bytes_received); } if (HAS_FIELD(tcpi_segs_out)) { pb->set_segs_out(ti.tcpi_segs_out); } if (HAS_FIELD(tcpi_segs_in)) { pb->set_segs_in(ti.tcpi_segs_in); } // Calculate sender bandwidth based on the same logic used by the 'ss' utility. if (ti.tcpi_rtt > 0 && ti.tcpi_snd_mss && ti.tcpi_snd_cwnd) { // Units: // rtt = usec // cwnd = number of MSS-size packets // mss = bytes / packet // // Dimensional analysis: // packets * bytes/packet * usecs/sec / usec -> bytes/sec static constexpr int kUsecsPerSec = 1000000; pb->set_send_bytes_per_sec(static_cast<int64_t>(ti.tcpi_snd_cwnd) * ti.tcpi_snd_mss * kUsecsPerSec / ti.tcpi_rtt); } } // Fetch the queue sizes. int queue_len = 0; rc = ioctl(fd, TIOCOUTQ, &queue_len); if (rc == 0) { pb->set_send_queue_bytes(queue_len); } rc = ioctl(fd, FIONREAD, &queue_len); if (rc == 0) { pb->set_receive_queue_bytes(queue_len); } return Status::OK(); } #endif // __linux__ Status Connection::GetTransportDetailsPB(TransportDetailsPB* pb) const { DCHECK(reactor_thread_->IsCurrentThread()); DCHECK(pb); // As for the dynamic_cast below: this is not very elegant or performant code, // but introducing a generic virtual method with vague semantics into the base // Socket class doesn't look like a good choice either. Also, the // GetTransportDetailsPB() method isn't supposed to be a part of any hot path. const TlsSocket* tls_socket = dynamic_cast<TlsSocket*>(socket_.get()); if (tls_socket) { auto* tls = pb->mutable_tls(); tls->set_protocol(tls_socket->GetProtocolName()); tls->set_cipher_suite(tls_socket->GetCipherDescription()); } int fd = socket_->GetFd(); CHECK_GE(fd, 0); int32_t max_seg_size = 0; socklen_t optlen = sizeof(max_seg_size); int ret = ::getsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, &max_seg_size, &optlen); if (ret) { int err = errno; return Status::NetworkError( "getsockopt(TCP_MAXSEG) failed", ErrnoToString(err), err); } pb->mutable_tcp()->set_max_segment_size(max_seg_size); return Status::OK(); } } // namespace rpc } // namespace kudu