in Release/src/websockets/client/ws_client_wspp.cpp [493:636]
void send_msg(websocket_outgoing_message& msg)
{
auto this_client = this->shared_from_this();
auto& is_buf = msg.m_body;
auto length = msg.m_length;
if (length == SIZE_MAX)
{
// This indicates we should determine the length automatically.
if (is_buf.has_size())
{
// The user's stream knows how large it is -- there's no need to buffer.
auto buf_sz = is_buf.size();
if (buf_sz >= SIZE_MAX)
{
msg.signal_body_sent(
std::make_exception_ptr(websocket_exception("Cannot send messages larger than SIZE_MAX.")));
return;
}
length = static_cast<size_t>(buf_sz);
// We have determined the length and can proceed normally.
}
else
{
// The stream needs to be buffered.
auto is_buf_istream = is_buf.create_istream();
msg.m_body = concurrency::streams::container_buffer<std::vector<uint8_t>>();
is_buf_istream.read_to_end(msg.m_body).then([this_client, msg](pplx::task<size_t> t) mutable {
try
{
msg.m_length = t.get();
this_client->send_msg(msg);
}
catch (...)
{
msg.signal_body_sent(std::current_exception());
}
});
// We have postponed the call to send_msg() until after the data is buffered.
return;
}
}
// First try to acquire the data (Get a pointer to the next already allocated contiguous block of data)
// If acquire succeeds, send the data over the socket connection, there is no copy of data from stream to
// temporary buffer. If acquire fails, copy the data to a temporary buffer managed by sp_allocated and send it
// over the socket connection.
std::shared_ptr<uint8_t> sp_allocated;
size_t acquired_size = 0;
uint8_t* ptr;
auto read_task = pplx::task_from_result();
bool acquired = is_buf.acquire(ptr, acquired_size);
if (!acquired ||
acquired_size < length) // Stream does not support acquire or failed to acquire specified number of bytes
{
// If acquire did not return the required number of bytes, do not rely on its return value.
if (acquired_size < length)
{
acquired = false;
is_buf.release(ptr, 0);
}
// Allocate buffer to hold the data to be read from the stream.
sp_allocated.reset(new uint8_t[length], [=](uint8_t* p) { delete[] p; });
read_task = is_buf.getn(sp_allocated.get(), length).then([length](size_t bytes_read) {
if (bytes_read != length)
{
throw websocket_exception("Failed to read required length of data from the stream.");
}
});
}
else
{
// Acquire succeeded, assign the acquired pointer to sp_allocated. Use an empty custom destructor
// so that the data is not released when sp_allocated goes out of scope. The streambuf will manage its
// memory.
sp_allocated.reset(ptr, [](uint8_t*) {});
}
read_task
.then([this_client, msg, sp_allocated, length]() {
std::lock_guard<std::mutex> lock(this_client->m_wspp_client_lock);
if (this_client->m_state > CONNECTED)
{
// The client has already been closed.
throw websocket_exception("Websocket connection is closed.");
}
websocketpp::lib::error_code ec;
if (this_client->m_client->is_tls_client())
{
this_client->send_msg_impl<websocketpp::config::asio_tls_client>(
this_client, msg, sp_allocated, length, ec);
}
else
{
this_client->send_msg_impl<websocketpp::config::asio_client>(
this_client, msg, sp_allocated, length, ec);
}
return ec;
})
.then([this_client, msg, is_buf, acquired, sp_allocated, length](
pplx::task<websocketpp::lib::error_code> previousTask) mutable {
std::exception_ptr eptr;
try
{
// Catch exceptions from previous tasks, if any and convert it to websocket exception.
const auto& ec = previousTask.get();
if (ec.value() != 0)
{
eptr = std::make_exception_ptr(websocket_exception(ec, build_error_msg(ec, "sending message")));
}
}
catch (...)
{
eptr = std::current_exception();
}
if (acquired)
{
is_buf.release(sp_allocated.get(), length);
}
// Set the send_task_completion_event after calling release.
if (eptr)
{
msg.signal_body_sent(eptr);
}
else
{
msg.signal_body_sent();
}
websocket_outgoing_message next_msg;
bool msg_pending = this_client->m_out_queue.pop_and_peek(next_msg);
if (msg_pending)
{
this_client->send_msg(next_msg);
}
});
}