in Release/src/websockets/client/ws_client_winrt.cpp [255:394]
void send_msg(websocket_outgoing_message& msg)
{
auto this_client = this->shared_from_this();
auto& is_buf = msg.m_body;
auto length = msg.m_length;
if (length == SIZE_MAX)
{
// This indicates we should determine the length automatically.
if (is_buf.has_size())
{
// The user's stream knows how large it is -- there's no need to buffer.
auto buf_sz = is_buf.size();
if (buf_sz >= SIZE_MAX)
{
msg.signal_body_sent(
std::make_exception_ptr(websocket_exception("Cannot send messages larger than SIZE_MAX.")));
return;
}
length = static_cast<size_t>(buf_sz);
// We have determined the length and can proceed normally.
}
else
{
// The stream needs to be buffered.
auto is_buf_istream = is_buf.create_istream();
msg.m_body = concurrency::streams::container_buffer<std::vector<uint8_t>>();
is_buf_istream.read_to_end(msg.m_body).then([this_client, msg](pplx::task<size_t> t) mutable {
try
{
msg.m_length = t.get();
this_client->send_msg(msg);
}
catch (...)
{
msg.signal_body_sent(std::current_exception());
}
});
// We have postponed the call to send_msg() until after the data is buffered.
return;
}
}
// First try to acquire the data (Get a pointer to the next already allocated contiguous block of data)
// If acquire succeeds, send the data over the socket connection, there is no copy of data from stream to
// temporary buffer. If acquire fails, copy the data to a temporary buffer managed by sp_allocated and send it
// over the socket connection.
std::shared_ptr<uint8_t> sp_allocated;
size_t acquired_size = 0;
uint8_t* ptr;
auto read_task = pplx::task_from_result();
bool acquired = is_buf.acquire(ptr, acquired_size);
if (!acquired ||
acquired_size < length) // Stream does not support acquire or failed to acquire specified number of bytes
{
// If acquire did not return the required number of bytes, do not rely on its return value.
if (acquired_size < length)
{
acquired = false;
is_buf.release(ptr, 0);
}
// Allocate buffer to hold the data to be read from the stream.
sp_allocated.reset(new uint8_t[length], [=](uint8_t* p) { delete[] p; });
read_task = is_buf.getn(sp_allocated.get(), length).then([length](size_t bytes_read) {
if (bytes_read != length)
{
throw websocket_exception("Failed to read required length of data from the stream.");
}
});
}
else
{
// Acquire succeeded, assign the acquired pointer to sp_allocated. Use an empty custom destructor
// so that the data is not released when sp_allocated goes out of scope. The streambuf will manage its
// memory.
sp_allocated.reset(ptr, [](uint8_t*) {});
}
read_task
.then([this_client, acquired, sp_allocated, length]() {
this_client->m_messageWriter->WriteBytes(
Platform::ArrayReference<unsigned char>(sp_allocated.get(), static_cast<unsigned int>(length)));
// Send the data as one complete message, in WinRT we do not have an option to send fragments.
return pplx::task<unsigned int>(this_client->m_messageWriter->StoreAsync());
})
.then([this_client, msg, is_buf, acquired, sp_allocated, length](
pplx::task<unsigned int> previousTask) mutable {
std::exception_ptr eptr;
unsigned int bytes_written = 0;
try
{
// Catch exceptions from previous tasks, if any and convert it to websocket exception.
bytes_written = previousTask.get();
if (bytes_written != length)
{
eptr = std::make_exception_ptr(websocket_exception("Failed to send all the bytes."));
}
}
catch (Platform::Exception ^ e)
{
// Convert to websocket_exception.
eptr = std::make_exception_ptr(websocket_exception(e->HResult, build_error_msg(e, "send_msg")));
}
catch (const websocket_exception& e)
{
// Catch to avoid slicing and losing the type if falling through to catch (...).
eptr = std::make_exception_ptr(e);
}
catch (...)
{
eptr = std::make_exception_ptr(std::current_exception());
}
if (acquired)
{
is_buf.release(sp_allocated.get(), bytes_written);
}
// Set the send_task_completion_event after calling release.
if (eptr)
{
msg.signal_body_sent(eptr);
}
else
{
msg.signal_body_sent();
}
websocket_outgoing_message next_msg;
bool msg_pending = this_client->m_out_queue.pop_and_peek(next_msg);
if (msg_pending)
{
this_client->send_msg(next_msg);
}
});
}