void DataFileWriterBase::sync()

in lang/c++/impl/DataFile.cc [140:276]


void DataFileWriterBase::sync() {
    encoderPtr_->flush();

    encoderPtr_->init(*stream_);
    avro::encode(*encoderPtr_, objectCount_);
    if (codec_ == NULL_CODEC) {
        int64_t byteCount = buffer_->byteCount();
        avro::encode(*encoderPtr_, byteCount);
        encoderPtr_->flush();
        std::unique_ptr<InputStream> in = memoryInputStream(*buffer_);
        copy(*in, *stream_);
    } else if (codec_ == DEFLATE_CODEC) {
        std::vector<uint8_t> buf;
        {
            z_stream zs;
            zs.zalloc = Z_NULL;
            zs.zfree = Z_NULL;
            zs.opaque = Z_NULL;

            int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
            if (ret != Z_OK) {
                throw Exception("Failed to initialize deflate, error: {}", ret);
            }

            std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
            const uint8_t *data;
            size_t len;
            while (ret != Z_STREAM_END && input->next(&data, &len)) {
                zs.avail_in = static_cast<uInt>(len);
                zs.next_in = const_cast<Bytef *>(data);
                bool flush = (zs.total_in + len) >= buffer_->byteCount();
                do {
                    if (zs.total_out == buf.size()) {
                        buf.resize(buf.size() + zlibBufGrowSize);
                    }
                    zs.avail_out = static_cast<uInt>(buf.size() - zs.total_out);
                    zs.next_out = buf.data() + zs.total_out;
                    ret = deflate(&zs, flush ? Z_FINISH : Z_NO_FLUSH);
                    if (ret == Z_STREAM_END) {
                        break;
                    }
                    if (ret != Z_OK) {
                        throw Exception("Failed to deflate, error: {}", ret);
                    }
                } while (zs.avail_out == 0);
            }

            buf.resize(zs.total_out);
            (void) deflateEnd(&zs);
        } // make sure all is flushed
        std::unique_ptr<InputStream> in = memoryInputStream(buf.data(), buf.size());
        int64_t byteCount = buf.size();
        avro::encode(*encoderPtr_, byteCount);
        encoderPtr_->flush();
        copy(*in, *stream_);
#ifdef SNAPPY_CODEC_AVAILABLE
    } else if (codec_ == SNAPPY_CODEC) {
        std::vector<char> temp;
        std::string compressed;

        const uint8_t *data;
        size_t len;
        std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
        while (input->next(&data, &len)) {
            temp.insert(temp.end(), reinterpret_cast<const char *>(data),
                        reinterpret_cast<const char *>(data) + len);
        }

        // For Snappy, add the CRC32 checksum
        auto checksum = crc32(0, reinterpret_cast<const Bytef *>(temp.data()),
                              static_cast<uInt>(temp.size()));

        // Now compress
        size_t compressed_size = snappy::Compress(
            reinterpret_cast<const char *>(temp.data()), temp.size(),
            &compressed);

        temp.clear();
        temp.insert(temp.end(), compressed.c_str(),
                    compressed.c_str() + compressed_size);

        temp.push_back(static_cast<char>((checksum >> 24) & 0xFF));
        temp.push_back(static_cast<char>((checksum >> 16) & 0xFF));
        temp.push_back(static_cast<char>((checksum >> 8) & 0xFF));
        temp.push_back(static_cast<char>(checksum & 0xFF));
        std::unique_ptr<InputStream> in = memoryInputStream(
            reinterpret_cast<const uint8_t *>(temp.data()), temp.size());
        int64_t byteCount = temp.size();
        avro::encode(*encoderPtr_, byteCount);
        encoderPtr_->flush();
        copy(*in, *stream_);
#endif
#ifdef ZSTD_CODEC_AVAILABLE
    } else if (codec_ == ZSTD_CODEC) {
        // Read all uncompressed data into a single buffer
        std::vector<char> uncompressed;
        const uint8_t *data;
        size_t len;
        std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
        while (input->next(&data, &len)) {
            uncompressed.insert(uncompressed.end(), reinterpret_cast<const char *>(data),
                                reinterpret_cast<const char *>(data) + len);
        }

        // Pre-allocate buffer for compressed data
        size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
        std::vector<char> compressed(max_compressed_size);

        // Compress the data using ZSTD block API
        size_t compressed_size = ZSTD_compress(
            compressed.data(), max_compressed_size,
            uncompressed.data(), uncompressed.size(),
            ZSTD_CLEVEL_DEFAULT);

        if (ZSTD_isError(compressed_size)) {
            throw Exception("ZSTD compression error: {}", ZSTD_getErrorName(compressed_size));
        }

        compressed.resize(compressed_size);
        std::unique_ptr<InputStream> in = memoryInputStream(
            reinterpret_cast<const uint8_t *>(compressed.data()), compressed.size());
        avro::encode(*encoderPtr_, static_cast<int64_t>(compressed_size));
        encoderPtr_->flush();
        copy(*in, *stream_);
#endif
    }

    encoderPtr_->init(*stream_);
    avro::encode(*encoderPtr_, sync_);
    encoderPtr_->flush();

    lastSync_ = stream_->byteCount();

    buffer_ = memoryOutputStream();
    encoderPtr_->init(*buffer_);
    objectCount_ = 0;
}