src/rpc/network.h (168 lines of code) (raw):
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#pragma once
#include <atomic>
#include <cstdint>
#include <string>
#include <unordered_map>
#include <vector>
#include "rpc/message_parser.h"
#include "rpc/rpc_host_port.h"
#include "rpc/rpc_message.h"
#include "rpc_address.h"
#include "task/task_spec.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/fmt_utils.h"
#include "utils/join_point.h"
#include "utils/link.h"
#include "utils/metrics.h"
#include "utils/synchronize.h"
namespace dsn {
class rpc_engine;
class service_node;
/*!
@addtogroup tool-api-providers
@{
*/
/*!
network bound to a specific rpc_channel and port (see start)
!!! all threads must be started with task::set_tls_dsn_context(provider->node(), null);
*/
class network
{
public:
//
// network factory prototype
//
template <typename T>
static network *create(rpc_engine *srv, network *inner_provider)
{
return new T(srv, inner_provider);
}
typedef network *(*factory)(rpc_engine *, network *);
public:
//
// srv - the rpc engine, could contain many networks there
// inner_provider - when not null, this network is simply a wrapper for tooling purpose (e.g.,
// tracing)
// all downcalls should be redirected to the inner provider in the end
//
network(rpc_engine *srv, network *inner_provider);
virtual ~network() {}
//
// when client_only is true, port is faked (equal to app id for tracing purpose)
//
virtual error_code start(rpc_channel channel, int port, bool client_only) = 0;
//
// the named address
//
virtual const ::dsn::rpc_address &address() const = 0;
virtual const ::dsn::host_port &host_port() const = 0;
//
// this is where the upper rpc engine calls down for a RPC call
// request - the message to be sent, all meta info (e.g., timeout, server address are
// prepared ready in its header; use message_parser to extract
// blobs from message for sending
//
virtual void send_message(message_ex *request) = 0;
//
// tools in rDSN may decide to drop this msg,
// in this case, the network should implement the appropriate
// failure model that makes this failure possible in reality
//
virtual void inject_drop_message(message_ex *msg, bool is_send) = 0;
//
// utilities
//
service_node *node() const;
//
// called when network received a complete request message
//
void on_recv_request(message_ex *msg, int delay_ms);
//
// called when network received a complete reply message or network failed,
// if network failed, the 'msg' will be nullptr
//
void on_recv_reply(uint64_t id, message_ex *msg, int delay_ms);
//
// create a message parser for
// (1) extracing blob from a RPC request message for low layer'
// (2) parsing a incoming blob message to get the rpc_message
//
message_parser *new_message_parser(network_header_format hdr_format);
rpc_engine *engine() const { return _engine; }
int max_buffer_block_count_per_send() const { return _max_buffer_block_count_per_send; }
network_header_format client_hdr_format() const { return _client_hdr_format; }
network_header_format unknown_msg_hdr_format() const { return _unknown_msg_header_format; }
int message_buffer_block_size() const { return _message_buffer_block_size; }
static uint32_t get_local_ipv4();
protected:
rpc_engine *_engine;
network_header_format _client_hdr_format;
network_header_format _unknown_msg_header_format; // default is NET_HDR_INVALID
int _message_buffer_block_size;
int _max_buffer_block_count_per_send;
private:
friend class rpc_engine;
void reset_parser_attr(network_header_format client_hdr_format, int message_buffer_block_size);
};
/*!
an incomplete network implementation for connection oriented network, e.g., TCP
*/
class connection_oriented_network : public network
{
public:
connection_oriented_network(rpc_engine *srv, network *inner_provider);
virtual ~connection_oriented_network() {}
// server session management
rpc_session_ptr get_server_session(::dsn::rpc_address ep);
void on_server_session_accepted(rpc_session_ptr &s);
void on_server_session_disconnected(rpc_session_ptr &s);
// Checks if IP of the incoming session has too much connections.
// Related config: [network] conn_threshold_per_ip. No limit if the value is 0.
bool check_if_conn_threshold_exceeded(::dsn::rpc_address ep);
// client session management
void on_client_session_connected(rpc_session_ptr &s);
void on_client_session_disconnected(rpc_session_ptr &s);
// called upon RPC call, rpc client session is created on demand
virtual void send_message(message_ex *request) override;
// called by rpc engine
virtual void inject_drop_message(message_ex *msg, bool is_send) override;
// to be defined
virtual rpc_session_ptr create_client_session(::dsn::rpc_address server_addr) = 0;
protected:
typedef std::unordered_map<::dsn::rpc_address, rpc_session_ptr> client_sessions;
client_sessions _clients; // to_address => rpc_session
utils::rw_lock_nr _clients_lock;
typedef std::unordered_map<::dsn::rpc_address, rpc_session_ptr> server_sessions;
server_sessions _servers; // from_address => rpc_session
typedef std::unordered_map<uint32_t, uint32_t> ip_connection_count;
ip_connection_count _ip_conn_count; // from_ip => connection count
utils::rw_lock_nr _servers_lock;
METRIC_VAR_DECLARE_gauge_int64(network_client_sessions);
METRIC_VAR_DECLARE_gauge_int64(network_server_sessions);
};
/*!
session managements (both client and server types)
*/
class rpc_client_matcher;
class rpc_session : public ref_counter
{
public:
/*!
@addtogroup tool-api-hooks
@{
*/
static join_point<void, rpc_session *> on_rpc_session_connected;
static join_point<void, rpc_session *> on_rpc_session_disconnected;
static join_point<bool, message_ex *> on_rpc_recv_message;
static join_point<bool, message_ex *> on_rpc_send_message;
/*@}*/
public:
rpc_session(connection_oriented_network &net,
::dsn::rpc_address remote_addr,
message_parser_ptr &parser,
bool is_client);
virtual ~rpc_session();
virtual void connect() = 0;
virtual void close() = 0;
// Whether this session is launched on client side.
bool is_client() const { return _is_client; }
dsn::rpc_address remote_address() const { return _remote_addr; }
dsn::host_port remote_host_port() const { return _remote_host_port; }
connection_oriented_network &net() const { return _net; }
message_parser_ptr parser() const { return _parser; }
///
/// rpc_session's interface for sending and receiving
///
void send_message(message_ex *msg);
bool cancel(message_ex *request);
bool delay_recv(int delay_ms);
bool on_recv_message(message_ex *msg, int delay_ms);
/// ret value:
/// true - pend succeed
/// false - pend failed
bool try_pend_message(message_ex *msg);
void clear_pending_messages();
/// interfaces for security authentication,
/// you can ignore them if you don't enable auth
void set_negotiation_succeed();
bool is_negotiation_succeed() const;
void set_client_username(const std::string &user_name);
const std::string &get_client_username() const;
public:
///
/// for subclass to implement receiving message
///
void start_read_next(int read_next = 256);
// should be called in do_read() before using _parser when it is nullptr.
// returns:
// -1 : prepare failed, maybe because of invalid message header type
// 0 : prepare succeed, _parser is not nullptr now.
// >0 : need read more data, returns read_next.
int prepare_parser();
virtual void do_read(int read_next) = 0;
///
/// for subclass to implement sending message
///
// return whether there are messages for sending;
// should always be called in lock
bool unlink_message_for_send();
virtual void send(uint64_t signature) = 0;
void on_send_completed(uint64_t signature);
virtual void on_failure(bool is_write);
protected:
///
/// fields related to sending messages
///
enum session_state
{
SS_CONNECTING,
SS_CONNECTED,
SS_DISCONNECTED
};
friend USER_DEFINED_ENUM_FORMATTER(rpc_session::session_state);
mutable utils::ex_lock_nr _lock; // [
volatile session_state _connect_state;
bool negotiation_succeed = false;
// when the negotiation of a session isn't succeed,
// all messages are queued in _pending_messages.
// after connected, all of them are moved to "_messages"
std::vector<message_ex *> _pending_messages;
// messages are sent in batch, firstly all messages are linked together
// in a doubly-linked list "_messages".
// if no messages are on-the-flying, a batch of messages are fetch from the "_messages"
// and put them to _sending_msgs; meanwhile, buffers of these messages are put
// in _sending_buffers
dlink _messages;
int _message_count; // count of _messages
bool _is_sending_next;
std::vector<message_ex *> _sending_msgs;
std::vector<message_parser::send_buf> _sending_buffers;
uint64_t _message_sent;
// ]
///
/// change status and check status
///
// return true when it is permitted
bool set_connecting();
// return true when it is permitted
bool set_disconnected();
void set_connected();
void clear_send_queue(bool resend_msgs);
bool on_disconnected(bool is_write);
// constant info
connection_oriented_network &_net;
dsn::rpc_address _remote_addr;
dsn::host_port _remote_host_port;
int _max_buffer_block_count_per_send;
message_reader _reader;
message_parser_ptr _parser;
private:
const bool _is_client;
rpc_client_matcher *_matcher;
std::atomic_int _delay_server_receive_ms;
// _client_username is only valid if it is a server rpc_session.
// it represents the name of the corresponding client
std::string _client_username;
};
// --------- inline implementation --------------
// return true if delay applied.
inline bool rpc_session::delay_recv(int delay_ms)
{
bool exchanged = false;
int old_delay_ms = _delay_server_receive_ms.load();
while (!exchanged && delay_ms > old_delay_ms) {
exchanged = _delay_server_receive_ms.compare_exchange_weak(old_delay_ms, delay_ms);
}
return exchanged;
}
/*@}*/
} // namespace dsn