std::string Rpc::sendStringAndGetShortReply()

in Malmo/src/TCPClient.cpp [102:219]


    std::string Rpc::sendStringAndGetShortReply(boost::asio::io_service& io_service, const std::string& ip_address, int port, const std::string& message, bool withSizeHeader)
    {
        const u_long MAX_PACKET_LENGTH = 1024;
        unsigned char data[MAX_PACKET_LENGTH + 1];
        std::vector<unsigned char> message_vector(message.begin(), message.end());

        tcp::resolver resolver(io_service);
        tcp::resolver::query query(ip_address, boost::lexical_cast<std::string>(port));
        tcp::socket socket(io_service);

        // Set up a deadline timer to close the socket on timeout, aborting any async io.

        boost::asio::deadline_timer deadline(io_service, timeout);

        deadline.async_wait([&](const boost::system::error_code& ec) {
            if (!ec)
            {
                LOGERROR(LT("Request/Reply communication timeout."));
                boost::system::error_code ignored_ec;
                socket.close(ignored_ec);
            }
        });

        // Attempt the resolve & connect. Relying on background worker thread to progress the async io.

        error_code_sync.init_error_code();

        resolver.async_resolve(query, [&](const boost::system::error_code& ec, tcp::resolver::iterator endpoint_iterator) {
            if (ec)
            {
                LOGERROR(LT("Failed to resolve "), ip_address, LT(":"), port, LT(" - "), ec.message());
                error_code_sync.signal_error_code(ec);
            }
            else
            {
                if (endpoint_iterator == tcp::resolver::iterator()) {
                    error_code_sync.signal_error_code(boost::asio::error::fault);
                }
                else {
                    socket.async_connect(endpoint_iterator->endpoint(), boost::bind(&ErrorCodeSync::signal_error_code, &this->error_code_sync, boost::asio::placeholders::error));
                }
            }
        });

        auto error_code = error_code_sync.await_error_code();

        if (error_code || !socket.is_open())
        {
            LOGERROR(LT("Failed to connect to "), ip_address, LT(":"), port, LT(" - "), error_code.message());
            throw std::runtime_error("Could not connect  " + (error_code ? error_code : boost::asio::error::operation_aborted).message());
        }

        error_code_sync.init_error_code();

        // The size header is 4 bytes containing the size of the body of the message as a network byte order integer.
        const int SIZE_HEADER_LENGTH = 4;
        u_long size_header;

        if (withSizeHeader)
        {
            u_long size_header = htonl((u_long)message.size());

            boost::asio::async_write(socket, boost::asio::buffer(&size_header, SIZE_HEADER_LENGTH), [&](const boost::system::error_code& ec, std::size_t transferred) {
                if (!ec)
                {
                    boost::asio::async_write(socket, boost::asio::buffer(message_vector), boost::bind(&Rpc::transfer_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
                }
                else
                {
                    transfer_handler(ec, transferred);
                }
            });
        }
        else 
        {
            boost::asio::async_write(socket, boost::asio::buffer(message_vector), boost::bind(&Rpc::transfer_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
        }

        error_code = error_code_sync.await_error_code();

        if (error_code)
            throw std::runtime_error("Request write failed " + error_code.message());

        error_code_sync.init_error_code();

        boost::asio::async_read(socket, boost::asio::buffer(&size_header, SIZE_HEADER_LENGTH), boost::asio::transfer_exactly(SIZE_HEADER_LENGTH), 
            [&](const boost::system::error_code& ec, std::size_t transferred) {
                if (!ec) {
                    size_header = ntohl(size_header);

                    if (size_header > MAX_PACKET_LENGTH)
                    {
                        LOGERROR(LT("Packet length of "), size_header, LT(" received from "), ip_address, LT(":"), port, LT(" exceeds maximum allowed."));
                        transfer_handler(boost::asio::error::fault, transferred);
                    }
                    else
                    {
                        boost::asio::async_read(socket, boost::asio::buffer(data, size_header), boost::asio::transfer_exactly(size_header), boost::bind(&Rpc::transfer_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
                    }
                }
                else
                {
                    transfer_handler(ec, transferred);
                }
        });

        error_code = error_code_sync.await_error_code();

        if (error_code)
            throw std::runtime_error("Response read failed " + error_code.message());

        std::string reply(data, data + size_header);

        if (!error_code)
            LOGFINE(LT("Received reply from  "), ip_address, LT(":"), port, LT(" - "), reply);

        return reply;
    }