in src/native/io/compressed/zstd_compression_writer.cpp [54:108]
void zstd_compression_writer::write_impl(std::string_view buffer)
{
if (buffer.size() == 0)
{
return;
}
ZSTD_inBuffer input_buffer{buffer.data(), buffer.size(), 0};
bool done = false;
auto new_processed_bytes = m_processed_bytes + buffer.size();
if (new_processed_bytes > m_uncompressed_input_size)
{
std::string msg =
"zstd_compression_writer::write_impl: Processed more bytes than input should have. new_processed_bytes = "
+ std::to_string(new_processed_bytes)
+ ", m_uncompressed_input_size = " + std::to_string(m_uncompressed_input_size);
throw errors::user_exception(errors::error_code::io_zstd_too_much_data_processed, msg);
}
bool last_chunk = new_processed_bytes == m_uncompressed_input_size;
do
{
ZSTD_outBuffer output_buffer{m_output_data.data(), m_output_data.capacity(), 0};
auto op = last_chunk ? ZSTD_EndDirective::ZSTD_e_end : ZSTD_EndDirective::ZSTD_e_continue;
size_t ret = ZSTD_compressStream2(m_zstd_cstream.get(), &output_buffer, &input_buffer, op);
if (ZSTD_isError(ret))
{
auto error_name = ZSTD_getErrorName(ret);
std::string msg = "ZSTD_compressStream2() failed. ret: " + std::to_string(ret)
+ std::string(" error_name: ") + error_name;
throw errors::user_exception(errors::error_code::io_zstd_compressstream2_failed, msg);
}
if (output_buffer.pos)
{
m_writer->write(std::string_view{static_cast<char *>(output_buffer.dst), output_buffer.pos});
m_compressed_size += output_buffer.pos;
}
done = last_chunk ? (ret == 0) : (input_buffer.size == input_buffer.pos);
}
while (!done);
m_processed_bytes = new_processed_bytes;
if (m_processed_bytes == m_uncompressed_input_size)
{
flush();
}
}