in Malmo/src/TCPClient.cpp [102:219]
std::string Rpc::sendStringAndGetShortReply(boost::asio::io_service& io_service, const std::string& ip_address, int port, const std::string& message, bool withSizeHeader)
{
const u_long MAX_PACKET_LENGTH = 1024;
unsigned char data[MAX_PACKET_LENGTH + 1];
std::vector<unsigned char> message_vector(message.begin(), message.end());
tcp::resolver resolver(io_service);
tcp::resolver::query query(ip_address, boost::lexical_cast<std::string>(port));
tcp::socket socket(io_service);
// Set up a deadline timer to close the socket on timeout, aborting any async io.
boost::asio::deadline_timer deadline(io_service, timeout);
deadline.async_wait([&](const boost::system::error_code& ec) {
if (!ec)
{
LOGERROR(LT("Request/Reply communication timeout."));
boost::system::error_code ignored_ec;
socket.close(ignored_ec);
}
});
// Attempt the resolve & connect. Relying on background worker thread to progress the async io.
error_code_sync.init_error_code();
resolver.async_resolve(query, [&](const boost::system::error_code& ec, tcp::resolver::iterator endpoint_iterator) {
if (ec)
{
LOGERROR(LT("Failed to resolve "), ip_address, LT(":"), port, LT(" - "), ec.message());
error_code_sync.signal_error_code(ec);
}
else
{
if (endpoint_iterator == tcp::resolver::iterator()) {
error_code_sync.signal_error_code(boost::asio::error::fault);
}
else {
socket.async_connect(endpoint_iterator->endpoint(), boost::bind(&ErrorCodeSync::signal_error_code, &this->error_code_sync, boost::asio::placeholders::error));
}
}
});
auto error_code = error_code_sync.await_error_code();
if (error_code || !socket.is_open())
{
LOGERROR(LT("Failed to connect to "), ip_address, LT(":"), port, LT(" - "), error_code.message());
throw std::runtime_error("Could not connect " + (error_code ? error_code : boost::asio::error::operation_aborted).message());
}
error_code_sync.init_error_code();
// The size header is 4 bytes containing the size of the body of the message as a network byte order integer.
const int SIZE_HEADER_LENGTH = 4;
u_long size_header;
if (withSizeHeader)
{
u_long size_header = htonl((u_long)message.size());
boost::asio::async_write(socket, boost::asio::buffer(&size_header, SIZE_HEADER_LENGTH), [&](const boost::system::error_code& ec, std::size_t transferred) {
if (!ec)
{
boost::asio::async_write(socket, boost::asio::buffer(message_vector), boost::bind(&Rpc::transfer_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
else
{
transfer_handler(ec, transferred);
}
});
}
else
{
boost::asio::async_write(socket, boost::asio::buffer(message_vector), boost::bind(&Rpc::transfer_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
error_code = error_code_sync.await_error_code();
if (error_code)
throw std::runtime_error("Request write failed " + error_code.message());
error_code_sync.init_error_code();
boost::asio::async_read(socket, boost::asio::buffer(&size_header, SIZE_HEADER_LENGTH), boost::asio::transfer_exactly(SIZE_HEADER_LENGTH),
[&](const boost::system::error_code& ec, std::size_t transferred) {
if (!ec) {
size_header = ntohl(size_header);
if (size_header > MAX_PACKET_LENGTH)
{
LOGERROR(LT("Packet length of "), size_header, LT(" received from "), ip_address, LT(":"), port, LT(" exceeds maximum allowed."));
transfer_handler(boost::asio::error::fault, transferred);
}
else
{
boost::asio::async_read(socket, boost::asio::buffer(data, size_header), boost::asio::transfer_exactly(size_header), boost::bind(&Rpc::transfer_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
}
else
{
transfer_handler(ec, transferred);
}
});
error_code = error_code_sync.await_error_code();
if (error_code)
throw std::runtime_error("Response read failed " + error_code.message());
std::string reply(data, data + size_header);
if (!error_code)
LOGFINE(LT("Received reply from "), ip_address, LT(":"), port, LT(" - "), reply);
return reply;
}