src/rpc/network.sim.cpp (142 lines of code) (raw):
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <string.h>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include "boost/asio/ip/impl/host_name.ipp"
#include "network.sim.h"
#include "runtime/node_scoper.h"
#include "task/task_code.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/rand.h"
#include "utils/singleton_store.h"
#include "utils/utils.h"
DSN_DEFINE_uint32(tools.simulator, min_message_delay_microseconds, 1, "min message delay (us)");
DSN_DEFINE_uint32(tools.simulator,
max_message_delay_microseconds,
100000,
"max message delay (us)");
namespace dsn {
class rpc_engine;
namespace tools {
// switch[channel][header_format]
// multiple machines connect to the same switch
// 10 should be >= than rpc_channel::max_value() + 1
// 10 should be >= than network_header_format::max_value() + 1
static utils::safe_singleton_store<::dsn::rpc_address, sim_network_provider *> s_switch[10][10];
sim_client_session::sim_client_session(sim_network_provider &net,
::dsn::rpc_address remote_addr,
message_parser_ptr &parser)
: rpc_session(net, remote_addr, parser, true)
{
}
void sim_client_session::connect()
{
if (set_connecting())
set_connected();
}
static message_ex *virtual_send_message(message_ex *msg)
{
std::shared_ptr<char> buffer(
dsn::utils::make_shared_array<char>(msg->header->body_length + sizeof(message_header)));
char *tmp = buffer.get();
for (auto &buf : msg->buffers) {
memcpy((void *)tmp, (const void *)buf.data(), (size_t)buf.length());
tmp += buf.length();
}
blob bb(buffer, msg->header->body_length + sizeof(message_header));
message_ex *recv_msg = message_ex::create_receive_message(bb);
recv_msg->to_address = msg->to_address;
recv_msg->to_host_port = msg->to_host_port;
msg->copy_to(*recv_msg); // extensible object state move
return recv_msg;
}
void sim_client_session::send(uint64_t sig)
{
for (auto &msg : _sending_msgs) {
sim_network_provider *rnet = nullptr;
if (!s_switch[task_spec::get(msg->local_rpc_code)->rpc_call_channel][msg->hdr_format].get(
remote_address(), rnet)) {
LOG_ERROR("cannot find destination node {} in simulator", remote_address());
// on_disconnected(); // disable this to avoid endless resending
} else {
auto server_session = rnet->get_server_session(_net.address());
if (nullptr == server_session) {
rpc_session_ptr cptr = this;
message_parser_ptr parser(_net.new_message_parser(msg->hdr_format));
server_session = new sim_server_session(*rnet, _net.address(), cptr, parser);
rnet->on_server_session_accepted(server_session);
}
message_ex *recv_msg = virtual_send_message(msg);
{
node_scoper ns(rnet->node());
CHECK(server_session->on_recv_message(recv_msg,
recv_msg->to_address ==
recv_msg->header->from_address
? 0
: rnet->net_delay_milliseconds()),
"");
}
}
}
on_send_completed(sig);
}
sim_server_session::sim_server_session(sim_network_provider &net,
::dsn::rpc_address remote_addr,
rpc_session_ptr &client,
message_parser_ptr &parser)
: rpc_session(net, remote_addr, parser, false)
{
_client = client;
}
void sim_server_session::send(uint64_t sig)
{
for (auto &msg : _sending_msgs) {
message_ex *recv_msg = virtual_send_message(msg);
{
node_scoper ns(_client->net().node());
CHECK(_client->on_recv_message(
recv_msg,
recv_msg->to_address == recv_msg->header->from_address
? 0
: (static_cast<sim_network_provider *>(&_net))->net_delay_milliseconds()),
"");
}
}
on_send_completed(sig);
}
sim_network_provider::sim_network_provider(rpc_engine *rpc, network *inner_provider)
: connection_oriented_network(rpc, inner_provider)
{
_address = rpc_address::from_host_port("localhost", 1);
_hp = ::dsn::host_port::from_address(_address);
LOG_WARNING_IF(!_hp, "'{}' can not be reverse resolved", _address);
}
error_code sim_network_provider::start(rpc_channel channel, int port, bool client_only)
{
CHECK(channel == RPC_CHANNEL_TCP || channel == RPC_CHANNEL_UDP,
"invalid given channel {}",
channel);
_address = dsn::rpc_address::from_host_port("localhost", port);
_hp = ::dsn::host_port::from_address(_address);
LOG_WARNING_IF(!_hp, "'{}' can not be reverse resolved", _address);
auto hostname = boost::asio::ip::host_name();
if (!client_only) {
for (int i = NET_HDR_INVALID + 1; i <= network_header_format::max_value(); i++) {
if (s_switch[channel][i].put(_address, this)) {
s_switch[channel][i].put(dsn::rpc_address::from_host_port(hostname, port), this);
} else {
return ERR_ADDRESS_ALREADY_USED;
}
}
return ERR_OK;
} else {
return ERR_OK;
}
}
uint32_t sim_network_provider::net_delay_milliseconds() const
{
return rand::next_u32(FLAGS_min_message_delay_microseconds,
FLAGS_max_message_delay_microseconds) /
1000;
}
} // namespace tools
} // namespace dsn