void send_msg()

in Release/src/websockets/client/ws_client_wspp.cpp [493:636]


    void send_msg(websocket_outgoing_message& msg)
    {
        auto this_client = this->shared_from_this();
        auto& is_buf = msg.m_body;
        auto length = msg.m_length;

        if (length == SIZE_MAX)
        {
            // This indicates we should determine the length automatically.
            if (is_buf.has_size())
            {
                // The user's stream knows how large it is -- there's no need to buffer.
                auto buf_sz = is_buf.size();
                if (buf_sz >= SIZE_MAX)
                {
                    msg.signal_body_sent(
                        std::make_exception_ptr(websocket_exception("Cannot send messages larger than SIZE_MAX.")));
                    return;
                }
                length = static_cast<size_t>(buf_sz);
                // We have determined the length and can proceed normally.
            }
            else
            {
                // The stream needs to be buffered.
                auto is_buf_istream = is_buf.create_istream();
                msg.m_body = concurrency::streams::container_buffer<std::vector<uint8_t>>();
                is_buf_istream.read_to_end(msg.m_body).then([this_client, msg](pplx::task<size_t> t) mutable {
                    try
                    {
                        msg.m_length = t.get();
                        this_client->send_msg(msg);
                    }
                    catch (...)
                    {
                        msg.signal_body_sent(std::current_exception());
                    }
                });
                // We have postponed the call to send_msg() until after the data is buffered.
                return;
            }
        }

        // First try to acquire the data (Get a pointer to the next already allocated contiguous block of data)
        // If acquire succeeds, send the data over the socket connection, there is no copy of data from stream to
        // temporary buffer. If acquire fails, copy the data to a temporary buffer managed by sp_allocated and send it
        // over the socket connection.
        std::shared_ptr<uint8_t> sp_allocated;
        size_t acquired_size = 0;
        uint8_t* ptr;
        auto read_task = pplx::task_from_result();
        bool acquired = is_buf.acquire(ptr, acquired_size);

        if (!acquired ||
            acquired_size < length) // Stream does not support acquire or failed to acquire specified number of bytes
        {
            // If acquire did not return the required number of bytes, do not rely on its return value.
            if (acquired_size < length)
            {
                acquired = false;
                is_buf.release(ptr, 0);
            }

            // Allocate buffer to hold the data to be read from the stream.
            sp_allocated.reset(new uint8_t[length], [=](uint8_t* p) { delete[] p; });

            read_task = is_buf.getn(sp_allocated.get(), length).then([length](size_t bytes_read) {
                if (bytes_read != length)
                {
                    throw websocket_exception("Failed to read required length of data from the stream.");
                }
            });
        }
        else
        {
            // Acquire succeeded, assign the acquired pointer to sp_allocated. Use an empty custom destructor
            // so that the data is not released when sp_allocated goes out of scope. The streambuf will manage its
            // memory.
            sp_allocated.reset(ptr, [](uint8_t*) {});
        }

        read_task
            .then([this_client, msg, sp_allocated, length]() {
                std::lock_guard<std::mutex> lock(this_client->m_wspp_client_lock);
                if (this_client->m_state > CONNECTED)
                {
                    // The client has already been closed.
                    throw websocket_exception("Websocket connection is closed.");
                }

                websocketpp::lib::error_code ec;
                if (this_client->m_client->is_tls_client())
                {
                    this_client->send_msg_impl<websocketpp::config::asio_tls_client>(
                        this_client, msg, sp_allocated, length, ec);
                }
                else
                {
                    this_client->send_msg_impl<websocketpp::config::asio_client>(
                        this_client, msg, sp_allocated, length, ec);
                }
                return ec;
            })
            .then([this_client, msg, is_buf, acquired, sp_allocated, length](
                      pplx::task<websocketpp::lib::error_code> previousTask) mutable {
                std::exception_ptr eptr;
                try
                {
                    // Catch exceptions from previous tasks, if any and convert it to websocket exception.
                    const auto& ec = previousTask.get();
                    if (ec.value() != 0)
                    {
                        eptr = std::make_exception_ptr(websocket_exception(ec, build_error_msg(ec, "sending message")));
                    }
                }
                catch (...)
                {
                    eptr = std::current_exception();
                }

                if (acquired)
                {
                    is_buf.release(sp_allocated.get(), length);
                }

                // Set the send_task_completion_event after calling release.
                if (eptr)
                {
                    msg.signal_body_sent(eptr);
                }
                else
                {
                    msg.signal_body_sent();
                }

                websocket_outgoing_message next_msg;
                bool msg_pending = this_client->m_out_queue.pop_and_peek(next_msg);

                if (msg_pending)
                {
                    this_client->send_msg(next_msg);
                }
            });
    }