size_t writer_to_reader_channel::read_some()

in src/native/io/compressed/writer_to_reader_channel.cpp [13:70]


size_t writer_to_reader_channel::read_some(std::span<char> buffer)
{
	auto remaining_to_read = buffer.size();
	size_t total_read      = 0;

	while (remaining_to_read)
	{
		if (done_reading())
		{
			return total_read;
		}

		auto available = get_available_read();
		while (available == 0)
		{
			if (done_reading())
			{
				return total_read;
			}

			// wait for a signal
			wait_for_available_content();

			if (canceled())
			{
				return total_read;
			}

			available = get_available_read();
		}

		auto to_read = std::min<size_t>(available, remaining_to_read);

		{
			std::lock_guard<std::mutex> lock(m_buffer_mutex);
			auto max_to_read = get_max_available_read();
			// we shouldn't read more than we think is available
			if (max_to_read < to_read)
			{
				std::string msg = "More data is available than we expect. m_total_read: " + std::to_string(m_total_read)
				                + ", max_to_read: " + std::to_string(max_to_read);
				throw errors::user_exception(
					errors::error_code::io_producer_consumer_reader_writer_reading_too_much_available, msg);
			}
			std::memcpy(buffer.data() + total_read, m_buffer.data() + m_start_offset, to_read);

			m_start_offset += to_read;
			m_total_read += to_read;
		}

		notify_all_writers();

		remaining_to_read -= to_read;
		total_read += to_read;
	}

	return total_read;
}