in src/native/io/compressed/writer_to_reader_channel.cpp [13:70]
size_t writer_to_reader_channel::read_some(std::span<char> buffer)
{
auto remaining_to_read = buffer.size();
size_t total_read = 0;
while (remaining_to_read)
{
if (done_reading())
{
return total_read;
}
auto available = get_available_read();
while (available == 0)
{
if (done_reading())
{
return total_read;
}
// wait for a signal
wait_for_available_content();
if (canceled())
{
return total_read;
}
available = get_available_read();
}
auto to_read = std::min<size_t>(available, remaining_to_read);
{
std::lock_guard<std::mutex> lock(m_buffer_mutex);
auto max_to_read = get_max_available_read();
// we shouldn't read more than we think is available
if (max_to_read < to_read)
{
std::string msg = "More data is available than we expect. m_total_read: " + std::to_string(m_total_read)
+ ", max_to_read: " + std::to_string(max_to_read);
throw errors::user_exception(
errors::error_code::io_producer_consumer_reader_writer_reading_too_much_available, msg);
}
std::memcpy(buffer.data() + total_read, m_buffer.data() + m_start_offset, to_read);
m_start_offset += to_read;
m_total_read += to_read;
}
notify_all_writers();
remaining_to_read -= to_read;
total_read += to_read;
}
return total_read;
}