void zstd_compression_writer::write_impl()

in src/native/io/compressed/zstd_compression_writer.cpp [54:108]


void zstd_compression_writer::write_impl(std::string_view buffer)
{
	if (buffer.size() == 0)
	{
		return;
	}

	ZSTD_inBuffer input_buffer{buffer.data(), buffer.size(), 0};

	bool done = false;

	auto new_processed_bytes = m_processed_bytes + buffer.size();
	if (new_processed_bytes > m_uncompressed_input_size)
	{
		std::string msg =
			"zstd_compression_writer::write_impl: Processed more bytes than input should have. new_processed_bytes = "
			+ std::to_string(new_processed_bytes)
			+ ", m_uncompressed_input_size = " + std::to_string(m_uncompressed_input_size);

		throw errors::user_exception(errors::error_code::io_zstd_too_much_data_processed, msg);
	}

	bool last_chunk = new_processed_bytes == m_uncompressed_input_size;
	do
	{
		ZSTD_outBuffer output_buffer{m_output_data.data(), m_output_data.capacity(), 0};

		auto op = last_chunk ? ZSTD_EndDirective::ZSTD_e_end : ZSTD_EndDirective::ZSTD_e_continue;

		size_t ret = ZSTD_compressStream2(m_zstd_cstream.get(), &output_buffer, &input_buffer, op);
		if (ZSTD_isError(ret))
		{
			auto error_name = ZSTD_getErrorName(ret);
			std::string msg = "ZSTD_compressStream2() failed. ret: " + std::to_string(ret)
			                + std::string(" error_name: ") + error_name;
			throw errors::user_exception(errors::error_code::io_zstd_compressstream2_failed, msg);
		}

		if (output_buffer.pos)
		{
			m_writer->write(std::string_view{static_cast<char *>(output_buffer.dst), output_buffer.pos});
			m_compressed_size += output_buffer.pos;
		}

		done = last_chunk ? (ret == 0) : (input_buffer.size == input_buffer.pos);
	}
	while (!done);

	m_processed_bytes = new_processed_bytes;

	if (m_processed_bytes == m_uncompressed_input_size)
	{
		flush();
	}
}