static void _transfer_encoding_chunked_write_data()

in Release/src/http/client/http_client_winhttp.cpp [1377:1683]


    static void _transfer_encoding_chunked_write_data(_In_ winhttp_request_context* p_request_context)
    {
        size_t chunk_size;
        std::unique_ptr<compression::compress_provider>& compressor = p_request_context->m_request.compressor();

        // Set the chunk size up front; we need it before the lambda functions come into scope
        if (compressor)
        {
            // We could allocate less than a chunk for the compressed data here, though that
            // would result in more trips through this path for not-so-compressible data...
            if (p_request_context->m_body_data.size() > http::details::chunked_encoding::additional_encoding_space)
            {
                // If we've previously allocated space for the compressed data, don't reduce it
                chunk_size =
                    p_request_context->m_body_data.size() - http::details::chunked_encoding::additional_encoding_space;
            }
            else if (p_request_context->m_remaining_to_write != (std::numeric_limits<size_t>::max)())
            {
                // Choose a semi-intelligent size based on how much total data is left to compress
                chunk_size = (std::min)(static_cast<size_t>(p_request_context->m_remaining_to_write) + 128,
                                        p_request_context->m_http_client->client_config().chunksize());
            }
            else
            {
                // Just base our allocation on the chunk size, since we don't have any other data available
                chunk_size = p_request_context->m_http_client->client_config().chunksize();
            }
        }
        else
        {
            // We're not compressing; use the smaller of the remaining data (if known) and the configured (or default)
            // chunk size
            chunk_size = (std::min)(static_cast<size_t>(p_request_context->m_remaining_to_write),
                                    p_request_context->m_http_client->client_config().chunksize());
        }
        p_request_context->allocate_request_space(
            nullptr, chunk_size + http::details::chunked_encoding::additional_encoding_space);

        auto after_read = [p_request_context, chunk_size, &compressor](pplx::task<size_t> op) {
            size_t bytes_read;
            try
            {
                bytes_read = op.get();
                // If the read buffer for copying exists then write to it.
                if (p_request_context->m_readBufferCopy)
                {
                    // We have raw memory here writing to a memory stream so it is safe to wait
                    // since it will always be non-blocking.
                    if (!compressor)
                    {
                        p_request_context->m_readBufferCopy
                            ->putn_nocopy(
                                &p_request_context->m_body_data.get()[http::details::chunked_encoding::data_offset],
                                bytes_read)
                            .wait();
                    }
                }
            }
            catch (...)
            {
                p_request_context->report_exception(std::current_exception());
                return;
            }

            _ASSERTE(bytes_read != static_cast<size_t>(-1));

            size_t offset = http::details::chunked_encoding::add_chunked_delimiters(
                p_request_context->m_body_data.get(),
                chunk_size + http::details::chunked_encoding::additional_encoding_space,
                bytes_read);

            if (!compressor && p_request_context->m_remaining_to_write != (std::numeric_limits<size_t>::max)())
            {
                if (bytes_read == 0 && p_request_context->m_remaining_to_write)
                {
                    // The stream ended earlier than we detected it should
                    http_exception ex(
                        U("Unexpected end of request body stream encountered before expected length met."));
                    p_request_context->report_exception(ex);
                    return;
                }
                p_request_context->m_remaining_to_write -= bytes_read;
            }

            // Stop writing chunks if we reached the end of the stream.
            // Note that we could detect end-of-stream based on !m_remaining_to_write, and insert
            // the last (0) chunk if we have enough extra space... though we currently don't.
            if (bytes_read == 0)
            {
                p_request_context->m_bodyType = no_body;
                if (p_request_context->m_readBufferCopy)
                {
                    // Move the saved buffer into the read buffer, which now supports seeking.
                    p_request_context->m_readStream =
                        concurrency::streams::container_stream<std::vector<uint8_t>>::open_istream(
                            std::move(p_request_context->m_readBufferCopy->collection()));
                    p_request_context->m_readBufferCopy.reset();
                }
            }

            const auto length = bytes_read + (http::details::chunked_encoding::additional_encoding_space - offset);

            if (!WinHttpWriteData(p_request_context->m_request_handle,
                                  &p_request_context->m_body_data.get()[offset],
                                  static_cast<DWORD>(length),
                                  nullptr))
            {
                auto errorCode = GetLastError();
                p_request_context->report_error(errorCode, build_error_msg(errorCode, "WinHttpWriteData"));
            }
        };

        if (compressor)
        {
            auto do_compress =
                [p_request_context, chunk_size, &compressor](pplx::task<size_t> op) -> pplx::task<size_t> {
                size_t bytes_read;

                try
                {
                    bytes_read = op.get();
                }
                catch (...)
                {
                    return pplx::task_from_exception<size_t>(std::current_exception());
                }

                uint8_t* buffer = p_request_context->m_compression_state.m_acquired;
                if (buffer == nullptr)
                {
                    buffer = p_request_context->m_compression_state.m_buffer.data();
                }

                web::http::compression::operation_hint hint = web::http::compression::operation_hint::has_more;

                if (bytes_read)
                {
                    // An actual read always resets compression state for the next chunk
                    _ASSERTE(p_request_context->m_compression_state.m_bytes_processed ==
                             p_request_context->m_compression_state.m_bytes_read);
                    _ASSERTE(!p_request_context->m_compression_state.m_needs_flush);
                    p_request_context->m_compression_state.m_bytes_read = bytes_read;
                    p_request_context->m_compression_state.m_bytes_processed = 0;
                    if (p_request_context->m_readBufferCopy)
                    {
                        // If we've been asked to keep a copy of the raw data for restarts, do so here, pre-compression
                        p_request_context->m_readBufferCopy->putn_nocopy(buffer, bytes_read).wait();
                    }
                    if (p_request_context->m_remaining_to_write == bytes_read)
                    {
                        // We've read to the end of the stream; finalize here if possible.  We'll
                        // decrement the remaining count as we actually process the read buffer.
                        hint = web::http::compression::operation_hint::is_last;
                    }
                }
                else if (p_request_context->m_compression_state.m_needs_flush)
                {
                    // All input has been consumed, but we still need to collect additional compressed output;
                    // this is done (in theory it can be multiple times) as a finalizing operation
                    hint = web::http::compression::operation_hint::is_last;
                }
                else if (p_request_context->m_compression_state.m_bytes_processed ==
                         p_request_context->m_compression_state.m_bytes_read)
                {
                    if (p_request_context->m_remaining_to_write &&
                        p_request_context->m_remaining_to_write != (std::numeric_limits<size_t>::max)())
                    {
                        // The stream ended earlier than we detected it should
                        return pplx::task_from_exception<size_t>(http_exception(
                            U("Unexpected end of request body stream encountered before expected length met.")));
                    }

                    // We think we're done; inform the compression library so it can finalize and/or give us any pending
                    // compressed bytes. Note that we may end up here multiple times if m_needs_flush is set, until all
                    // compressed bytes are drained.
                    hint = web::http::compression::operation_hint::is_last;
                }
                // else we're still compressing bytes from the previous read

                _ASSERTE(p_request_context->m_compression_state.m_bytes_processed <=
                         p_request_context->m_compression_state.m_bytes_read);

                uint8_t* in = buffer + p_request_context->m_compression_state.m_bytes_processed;
                size_t inbytes = p_request_context->m_compression_state.m_bytes_read -
                                 p_request_context->m_compression_state.m_bytes_processed;
                return compressor
                    ->compress(in,
                               inbytes,
                               &p_request_context->m_body_data.get()[http::details::chunked_encoding::data_offset],
                               chunk_size,
                               hint)
                    .then([p_request_context, bytes_read, hint, chunk_size](
                              pplx::task<http::compression::operation_result> op) -> pplx::task<size_t> {
                        http::compression::operation_result r;

                        try
                        {
                            r = op.get();
                        }
                        catch (...)
                        {
                            return pplx::task_from_exception<size_t>(std::current_exception());
                        }

                        if (hint == web::http::compression::operation_hint::is_last)
                        {
                            // We're done reading all chunks, but the compressor may still have compressed bytes to
                            // drain from previous reads
                            _ASSERTE(r.done || r.output_bytes_produced == chunk_size);
                            p_request_context->m_compression_state.m_needs_flush = !r.done;
                            p_request_context->m_compression_state.m_done = r.done;
                        }

                        // Update the number of bytes compressed in this read chunk; if it's been fully compressed,
                        // we'll reset m_bytes_processed and m_bytes_read after reading the next chunk
                        p_request_context->m_compression_state.m_bytes_processed += r.input_bytes_processed;
                        _ASSERTE(p_request_context->m_compression_state.m_bytes_processed <=
                                 p_request_context->m_compression_state.m_bytes_read);
                        if (p_request_context->m_remaining_to_write != (std::numeric_limits<size_t>::max)())
                        {
                            _ASSERTE(p_request_context->m_remaining_to_write >= r.input_bytes_processed);
                            p_request_context->m_remaining_to_write -= r.input_bytes_processed;
                        }

                        if (p_request_context->m_compression_state.m_acquired != nullptr &&
                            p_request_context->m_compression_state.m_bytes_processed ==
                                p_request_context->m_compression_state.m_bytes_read)
                        {
                            // Release the acquired buffer back to the streambuf at the earliest possible point
                            p_request_context->_get_readbuffer().release(
                                p_request_context->m_compression_state.m_acquired,
                                p_request_context->m_compression_state.m_bytes_processed);
                            p_request_context->m_compression_state.m_acquired = nullptr;
                        }

                        return pplx::task_from_result<size_t>(r.output_bytes_produced);
                    });
            };

            if (p_request_context->m_compression_state.m_bytes_processed <
                    p_request_context->m_compression_state.m_bytes_read ||
                p_request_context->m_compression_state.m_needs_flush)
            {
                // We're still working on data from a previous read; continue compression without reading new data
                do_compress(pplx::task_from_result<size_t>(0)).then(after_read);
            }
            else if (p_request_context->m_compression_state.m_done)
            {
                // We just need to send the last (zero-length) chunk; there's no sense in going through the compression
                // path
                after_read(pplx::task_from_result<size_t>(0));
            }
            else
            {
                size_t length;

                // We need to read from the input stream, then compress before sending
                if (p_request_context->_get_readbuffer().acquire(p_request_context->m_compression_state.m_acquired,
                                                                 length))
                {
                    if (length == 0)
                    {
                        if (p_request_context->_get_readbuffer().exception())
                        {
                            p_request_context->report_exception(p_request_context->_get_readbuffer().exception());
                            return;
                        }
                        else if (p_request_context->m_remaining_to_write &&
                                 p_request_context->m_remaining_to_write != (std::numeric_limits<size_t>::max)())
                        {
                            // Unexpected end-of-stream.
                            p_request_context->report_error(GetLastError(),
                                                            _XPLATSTR("Outgoing HTTP body stream ended early."));
                            return;
                        }
                    }
                    else if (length > p_request_context->m_remaining_to_write)
                    {
                        // The stream grew, but we won't
                        length = static_cast<size_t>(p_request_context->m_remaining_to_write);
                    }

                    do_compress(pplx::task_from_result<size_t>(length)).then(after_read);
                }
                else
                {
                    length = (std::min)(static_cast<size_t>(p_request_context->m_remaining_to_write),
                                        p_request_context->m_http_client->client_config().chunksize());
                    if (p_request_context->m_compression_state.m_buffer.capacity() < length)
                    {
                        p_request_context->m_compression_state.m_buffer.reserve(length);
                    }
                    p_request_context->_get_readbuffer()
                        .getn(p_request_context->m_compression_state.m_buffer.data(), length)
                        .then(do_compress)
                        .then(after_read);
                }
            }
        }
        else
        {
            // We're not compressing; just read and chunk
            p_request_context->_get_readbuffer()
                .getn(&p_request_context->m_body_data.get()[http::details::chunked_encoding::data_offset], chunk_size)
                .then(after_read);
        }
    }