in Release/src/http/client/http_client_winhttp.cpp [1377:1683]
static void _transfer_encoding_chunked_write_data(_In_ winhttp_request_context* p_request_context)
{
size_t chunk_size;
std::unique_ptr<compression::compress_provider>& compressor = p_request_context->m_request.compressor();
// Set the chunk size up front; we need it before the lambda functions come into scope
if (compressor)
{
// We could allocate less than a chunk for the compressed data here, though that
// would result in more trips through this path for not-so-compressible data...
if (p_request_context->m_body_data.size() > http::details::chunked_encoding::additional_encoding_space)
{
// If we've previously allocated space for the compressed data, don't reduce it
chunk_size =
p_request_context->m_body_data.size() - http::details::chunked_encoding::additional_encoding_space;
}
else if (p_request_context->m_remaining_to_write != (std::numeric_limits<size_t>::max)())
{
// Choose a semi-intelligent size based on how much total data is left to compress
chunk_size = (std::min)(static_cast<size_t>(p_request_context->m_remaining_to_write) + 128,
p_request_context->m_http_client->client_config().chunksize());
}
else
{
// Just base our allocation on the chunk size, since we don't have any other data available
chunk_size = p_request_context->m_http_client->client_config().chunksize();
}
}
else
{
// We're not compressing; use the smaller of the remaining data (if known) and the configured (or default)
// chunk size
chunk_size = (std::min)(static_cast<size_t>(p_request_context->m_remaining_to_write),
p_request_context->m_http_client->client_config().chunksize());
}
p_request_context->allocate_request_space(
nullptr, chunk_size + http::details::chunked_encoding::additional_encoding_space);
auto after_read = [p_request_context, chunk_size, &compressor](pplx::task<size_t> op) {
size_t bytes_read;
try
{
bytes_read = op.get();
// If the read buffer for copying exists then write to it.
if (p_request_context->m_readBufferCopy)
{
// We have raw memory here writing to a memory stream so it is safe to wait
// since it will always be non-blocking.
if (!compressor)
{
p_request_context->m_readBufferCopy
->putn_nocopy(
&p_request_context->m_body_data.get()[http::details::chunked_encoding::data_offset],
bytes_read)
.wait();
}
}
}
catch (...)
{
p_request_context->report_exception(std::current_exception());
return;
}
_ASSERTE(bytes_read != static_cast<size_t>(-1));
size_t offset = http::details::chunked_encoding::add_chunked_delimiters(
p_request_context->m_body_data.get(),
chunk_size + http::details::chunked_encoding::additional_encoding_space,
bytes_read);
if (!compressor && p_request_context->m_remaining_to_write != (std::numeric_limits<size_t>::max)())
{
if (bytes_read == 0 && p_request_context->m_remaining_to_write)
{
// The stream ended earlier than we detected it should
http_exception ex(
U("Unexpected end of request body stream encountered before expected length met."));
p_request_context->report_exception(ex);
return;
}
p_request_context->m_remaining_to_write -= bytes_read;
}
// Stop writing chunks if we reached the end of the stream.
// Note that we could detect end-of-stream based on !m_remaining_to_write, and insert
// the last (0) chunk if we have enough extra space... though we currently don't.
if (bytes_read == 0)
{
p_request_context->m_bodyType = no_body;
if (p_request_context->m_readBufferCopy)
{
// Move the saved buffer into the read buffer, which now supports seeking.
p_request_context->m_readStream =
concurrency::streams::container_stream<std::vector<uint8_t>>::open_istream(
std::move(p_request_context->m_readBufferCopy->collection()));
p_request_context->m_readBufferCopy.reset();
}
}
const auto length = bytes_read + (http::details::chunked_encoding::additional_encoding_space - offset);
if (!WinHttpWriteData(p_request_context->m_request_handle,
&p_request_context->m_body_data.get()[offset],
static_cast<DWORD>(length),
nullptr))
{
auto errorCode = GetLastError();
p_request_context->report_error(errorCode, build_error_msg(errorCode, "WinHttpWriteData"));
}
};
if (compressor)
{
auto do_compress =
[p_request_context, chunk_size, &compressor](pplx::task<size_t> op) -> pplx::task<size_t> {
size_t bytes_read;
try
{
bytes_read = op.get();
}
catch (...)
{
return pplx::task_from_exception<size_t>(std::current_exception());
}
uint8_t* buffer = p_request_context->m_compression_state.m_acquired;
if (buffer == nullptr)
{
buffer = p_request_context->m_compression_state.m_buffer.data();
}
web::http::compression::operation_hint hint = web::http::compression::operation_hint::has_more;
if (bytes_read)
{
// An actual read always resets compression state for the next chunk
_ASSERTE(p_request_context->m_compression_state.m_bytes_processed ==
p_request_context->m_compression_state.m_bytes_read);
_ASSERTE(!p_request_context->m_compression_state.m_needs_flush);
p_request_context->m_compression_state.m_bytes_read = bytes_read;
p_request_context->m_compression_state.m_bytes_processed = 0;
if (p_request_context->m_readBufferCopy)
{
// If we've been asked to keep a copy of the raw data for restarts, do so here, pre-compression
p_request_context->m_readBufferCopy->putn_nocopy(buffer, bytes_read).wait();
}
if (p_request_context->m_remaining_to_write == bytes_read)
{
// We've read to the end of the stream; finalize here if possible. We'll
// decrement the remaining count as we actually process the read buffer.
hint = web::http::compression::operation_hint::is_last;
}
}
else if (p_request_context->m_compression_state.m_needs_flush)
{
// All input has been consumed, but we still need to collect additional compressed output;
// this is done (in theory it can be multiple times) as a finalizing operation
hint = web::http::compression::operation_hint::is_last;
}
else if (p_request_context->m_compression_state.m_bytes_processed ==
p_request_context->m_compression_state.m_bytes_read)
{
if (p_request_context->m_remaining_to_write &&
p_request_context->m_remaining_to_write != (std::numeric_limits<size_t>::max)())
{
// The stream ended earlier than we detected it should
return pplx::task_from_exception<size_t>(http_exception(
U("Unexpected end of request body stream encountered before expected length met.")));
}
// We think we're done; inform the compression library so it can finalize and/or give us any pending
// compressed bytes. Note that we may end up here multiple times if m_needs_flush is set, until all
// compressed bytes are drained.
hint = web::http::compression::operation_hint::is_last;
}
// else we're still compressing bytes from the previous read
_ASSERTE(p_request_context->m_compression_state.m_bytes_processed <=
p_request_context->m_compression_state.m_bytes_read);
uint8_t* in = buffer + p_request_context->m_compression_state.m_bytes_processed;
size_t inbytes = p_request_context->m_compression_state.m_bytes_read -
p_request_context->m_compression_state.m_bytes_processed;
return compressor
->compress(in,
inbytes,
&p_request_context->m_body_data.get()[http::details::chunked_encoding::data_offset],
chunk_size,
hint)
.then([p_request_context, bytes_read, hint, chunk_size](
pplx::task<http::compression::operation_result> op) -> pplx::task<size_t> {
http::compression::operation_result r;
try
{
r = op.get();
}
catch (...)
{
return pplx::task_from_exception<size_t>(std::current_exception());
}
if (hint == web::http::compression::operation_hint::is_last)
{
// We're done reading all chunks, but the compressor may still have compressed bytes to
// drain from previous reads
_ASSERTE(r.done || r.output_bytes_produced == chunk_size);
p_request_context->m_compression_state.m_needs_flush = !r.done;
p_request_context->m_compression_state.m_done = r.done;
}
// Update the number of bytes compressed in this read chunk; if it's been fully compressed,
// we'll reset m_bytes_processed and m_bytes_read after reading the next chunk
p_request_context->m_compression_state.m_bytes_processed += r.input_bytes_processed;
_ASSERTE(p_request_context->m_compression_state.m_bytes_processed <=
p_request_context->m_compression_state.m_bytes_read);
if (p_request_context->m_remaining_to_write != (std::numeric_limits<size_t>::max)())
{
_ASSERTE(p_request_context->m_remaining_to_write >= r.input_bytes_processed);
p_request_context->m_remaining_to_write -= r.input_bytes_processed;
}
if (p_request_context->m_compression_state.m_acquired != nullptr &&
p_request_context->m_compression_state.m_bytes_processed ==
p_request_context->m_compression_state.m_bytes_read)
{
// Release the acquired buffer back to the streambuf at the earliest possible point
p_request_context->_get_readbuffer().release(
p_request_context->m_compression_state.m_acquired,
p_request_context->m_compression_state.m_bytes_processed);
p_request_context->m_compression_state.m_acquired = nullptr;
}
return pplx::task_from_result<size_t>(r.output_bytes_produced);
});
};
if (p_request_context->m_compression_state.m_bytes_processed <
p_request_context->m_compression_state.m_bytes_read ||
p_request_context->m_compression_state.m_needs_flush)
{
// We're still working on data from a previous read; continue compression without reading new data
do_compress(pplx::task_from_result<size_t>(0)).then(after_read);
}
else if (p_request_context->m_compression_state.m_done)
{
// We just need to send the last (zero-length) chunk; there's no sense in going through the compression
// path
after_read(pplx::task_from_result<size_t>(0));
}
else
{
size_t length;
// We need to read from the input stream, then compress before sending
if (p_request_context->_get_readbuffer().acquire(p_request_context->m_compression_state.m_acquired,
length))
{
if (length == 0)
{
if (p_request_context->_get_readbuffer().exception())
{
p_request_context->report_exception(p_request_context->_get_readbuffer().exception());
return;
}
else if (p_request_context->m_remaining_to_write &&
p_request_context->m_remaining_to_write != (std::numeric_limits<size_t>::max)())
{
// Unexpected end-of-stream.
p_request_context->report_error(GetLastError(),
_XPLATSTR("Outgoing HTTP body stream ended early."));
return;
}
}
else if (length > p_request_context->m_remaining_to_write)
{
// The stream grew, but we won't
length = static_cast<size_t>(p_request_context->m_remaining_to_write);
}
do_compress(pplx::task_from_result<size_t>(length)).then(after_read);
}
else
{
length = (std::min)(static_cast<size_t>(p_request_context->m_remaining_to_write),
p_request_context->m_http_client->client_config().chunksize());
if (p_request_context->m_compression_state.m_buffer.capacity() < length)
{
p_request_context->m_compression_state.m_buffer.reserve(length);
}
p_request_context->_get_readbuffer()
.getn(p_request_context->m_compression_state.m_buffer.data(), length)
.then(do_compress)
.then(after_read);
}
}
}
else
{
// We're not compressing; just read and chunk
p_request_context->_get_readbuffer()
.getn(&p_request_context->m_body_data.get()[http::details::chunked_encoding::data_offset], chunk_size)
.then(after_read);
}
}