in lang/c++/impl/DataFile.cc [140:276]
void DataFileWriterBase::sync() {
encoderPtr_->flush();
encoderPtr_->init(*stream_);
avro::encode(*encoderPtr_, objectCount_);
if (codec_ == NULL_CODEC) {
int64_t byteCount = buffer_->byteCount();
avro::encode(*encoderPtr_, byteCount);
encoderPtr_->flush();
std::unique_ptr<InputStream> in = memoryInputStream(*buffer_);
copy(*in, *stream_);
} else if (codec_ == DEFLATE_CODEC) {
std::vector<uint8_t> buf;
{
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
if (ret != Z_OK) {
throw Exception("Failed to initialize deflate, error: {}", ret);
}
std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
const uint8_t *data;
size_t len;
while (ret != Z_STREAM_END && input->next(&data, &len)) {
zs.avail_in = static_cast<uInt>(len);
zs.next_in = const_cast<Bytef *>(data);
bool flush = (zs.total_in + len) >= buffer_->byteCount();
do {
if (zs.total_out == buf.size()) {
buf.resize(buf.size() + zlibBufGrowSize);
}
zs.avail_out = static_cast<uInt>(buf.size() - zs.total_out);
zs.next_out = buf.data() + zs.total_out;
ret = deflate(&zs, flush ? Z_FINISH : Z_NO_FLUSH);
if (ret == Z_STREAM_END) {
break;
}
if (ret != Z_OK) {
throw Exception("Failed to deflate, error: {}", ret);
}
} while (zs.avail_out == 0);
}
buf.resize(zs.total_out);
(void) deflateEnd(&zs);
} // make sure all is flushed
std::unique_ptr<InputStream> in = memoryInputStream(buf.data(), buf.size());
int64_t byteCount = buf.size();
avro::encode(*encoderPtr_, byteCount);
encoderPtr_->flush();
copy(*in, *stream_);
#ifdef SNAPPY_CODEC_AVAILABLE
} else if (codec_ == SNAPPY_CODEC) {
std::vector<char> temp;
std::string compressed;
const uint8_t *data;
size_t len;
std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
while (input->next(&data, &len)) {
temp.insert(temp.end(), reinterpret_cast<const char *>(data),
reinterpret_cast<const char *>(data) + len);
}
// For Snappy, add the CRC32 checksum
auto checksum = crc32(0, reinterpret_cast<const Bytef *>(temp.data()),
static_cast<uInt>(temp.size()));
// Now compress
size_t compressed_size = snappy::Compress(
reinterpret_cast<const char *>(temp.data()), temp.size(),
&compressed);
temp.clear();
temp.insert(temp.end(), compressed.c_str(),
compressed.c_str() + compressed_size);
temp.push_back(static_cast<char>((checksum >> 24) & 0xFF));
temp.push_back(static_cast<char>((checksum >> 16) & 0xFF));
temp.push_back(static_cast<char>((checksum >> 8) & 0xFF));
temp.push_back(static_cast<char>(checksum & 0xFF));
std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(temp.data()), temp.size());
int64_t byteCount = temp.size();
avro::encode(*encoderPtr_, byteCount);
encoderPtr_->flush();
copy(*in, *stream_);
#endif
#ifdef ZSTD_CODEC_AVAILABLE
} else if (codec_ == ZSTD_CODEC) {
// Read all uncompressed data into a single buffer
std::vector<char> uncompressed;
const uint8_t *data;
size_t len;
std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
while (input->next(&data, &len)) {
uncompressed.insert(uncompressed.end(), reinterpret_cast<const char *>(data),
reinterpret_cast<const char *>(data) + len);
}
// Pre-allocate buffer for compressed data
size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
std::vector<char> compressed(max_compressed_size);
// Compress the data using ZSTD block API
size_t compressed_size = ZSTD_compress(
compressed.data(), max_compressed_size,
uncompressed.data(), uncompressed.size(),
ZSTD_CLEVEL_DEFAULT);
if (ZSTD_isError(compressed_size)) {
throw Exception("ZSTD compression error: {}", ZSTD_getErrorName(compressed_size));
}
compressed.resize(compressed_size);
std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(compressed.data()), compressed.size());
avro::encode(*encoderPtr_, static_cast<int64_t>(compressed_size));
encoderPtr_->flush();
copy(*in, *stream_);
#endif
}
encoderPtr_->init(*stream_);
avro::encode(*encoderPtr_, sync_);
encoderPtr_->flush();
lastSync_ = stream_->byteCount();
buffer_ = memoryOutputStream();
encoderPtr_->init(*buffer_);
objectCount_ = 0;
}