in src/native/io/compressed/zstd_compression_reader.cpp [11:77]
size_t zstd_compression_reader::read_some(std::span<char> buffer)
{
ZSTD_outBuffer out_buffer{buffer.data(), buffer.size(), 0};
size_t last_ret{};
bool retry{false};
do
{
bool last = (m_processed_bytes == m_uncompressed_input_size);
size_t ret;
if (last)
{
ret = ZSTD_compressStream2(
m_zstd_cstream.get(), &out_buffer, &m_in_zstd_buffer, ZSTD_EndDirective::ZSTD_e_end);
}
else
{
ret = ZSTD_compressStream(m_zstd_cstream.get(), &out_buffer, &m_in_zstd_buffer);
}
if (ZSTD_isError(ret))
{
throw errors::user_exception(errors::error_code::io_zstd_compress_stream_failed, ZSTD_getErrorName(ret));
}
if (last)
{
// positive value here indicates we have remaining bytes to read... but we think we're done?
if (ret > 0)
{
throw errors::user_exception(errors::error_code::io_zstd_compress_finished_early);
}
break;
}
// There wasn't enough data already present, so read some more
if ((out_buffer.size != out_buffer.pos) && (m_in_zstd_buffer.size == m_in_zstd_buffer.pos))
{
auto to_read = std::min(ret, m_in_vector.capacity());
auto from_reader = m_sequential_reader->read_some(std::span<char>{m_in_vector.data(), to_read});
m_in_zstd_buffer.size = from_reader;
m_in_zstd_buffer.pos = 0;
m_processed_bytes += from_reader;
bool maybe_incomplete = (from_reader == 0) && (last_ret == ret);
if (maybe_incomplete && retry)
{
throw errors::user_exception(errors::error_code::io_zstd_compress_cannot_finish);
}
retry = maybe_incomplete;
}
last_ret = ret;
}
while (out_buffer.size != out_buffer.pos);
m_read_offset += out_buffer.pos;
return out_buffer.pos;
}