void DataFileReaderBase::readDataBlock()

in lang/c++/impl/DataFile.cc [419:568]


void DataFileReaderBase::readDataBlock() {
    decoder_->init(*stream_);
    blockStart_ = stream_->byteCount();
    const uint8_t *p = nullptr;
    size_t n = 0;
    if (!stream_->next(&p, &n)) {
        eof_ = true;
        return;
    }
    stream_->backup(n);
    avro::decode(*decoder_, objectCount_);
    int64_t byteCount;
    avro::decode(*decoder_, byteCount);
    decoder_->init(*stream_);
    blockEnd_ = stream_->byteCount() + byteCount;

    unique_ptr<InputStream> st = boundedInputStream(*stream_, static_cast<size_t>(byteCount));
    if (codec_ == NULL_CODEC) {
        dataDecoder_->init(*st);
        dataStream_ = std::move(st);
#ifdef SNAPPY_CODEC_AVAILABLE
    } else if (codec_ == SNAPPY_CODEC) {
        uint32_t checksum = 0;
        compressed_.clear();
        uncompressed.clear();
        const uint8_t *data;
        size_t len;
        while (st->next(&data, &len)) {
            compressed_.insert(compressed_.end(), data, data + len);
        }
        len = compressed_.size();
        if (len < 4)
            throw Exception("Cannot read compressed data, expected at least 4 bytes, got " + std::to_string(len));

        int b1 = compressed_[len - 4] & 0xFF;
        int b2 = compressed_[len - 3] & 0xFF;
        int b3 = compressed_[len - 2] & 0xFF;
        int b4 = compressed_[len - 1] & 0xFF;

        checksum = (b1 << 24) + (b2 << 16) + (b3 << 8) + (b4);
        if (!snappy::Uncompress(reinterpret_cast<const char *>(compressed_.data()),
                                len - 4, &uncompressed)) {
            throw Exception(
                "Snappy Compression reported an error when decompressing");
        }
        auto c = crc32(0, reinterpret_cast<const Bytef *>(uncompressed.c_str()),
                       static_cast<uInt>(uncompressed.size()));
        if (checksum != c) {
            throw Exception(
                "Checksum did not match for Snappy compression: Expected: {}, computed: {}",
                checksum, c);
        }

        std::unique_ptr<InputStream> in = memoryInputStream(
            reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
            uncompressed.size());

        dataDecoder_->init(*in);
        dataStream_ = std::move(in);
#endif
#ifdef ZSTD_CODEC_AVAILABLE
    } else if (codec_ == ZSTD_CODEC) {
        compressed_.clear();
        const uint8_t *data;
        size_t len;
        while (st->next(&data, &len)) {
            compressed_.insert(compressed_.end(), data, data + len);
        }

        // Get the decompressed size
        size_t decompressed_size = ZSTD_getFrameContentSize(
            reinterpret_cast<const char *>(compressed_.data()), compressed_.size());
        if (decompressed_size == ZSTD_CONTENTSIZE_ERROR) {
            throw Exception("ZSTD: Not a valid compressed frame");
        } else if (decompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
            throw Exception("ZSTD: Unable to determine decompressed size");
        }

        // Decompress the data
        uncompressed.clear();
        uncompressed.resize(decompressed_size);
        size_t result = ZSTD_decompress(
            uncompressed.data(), decompressed_size,
            reinterpret_cast<const char *>(compressed_.data()), compressed_.size());

        if (ZSTD_isError(result)) {
            throw Exception("ZSTD decompression error: {}", ZSTD_getErrorName(result));
        }
        if (result != decompressed_size) {
            throw Exception("ZSTD: Decompressed size mismatch: expected {}, got {}",
                            decompressed_size, result);
        }

        std::unique_ptr<InputStream> in = memoryInputStream(
            reinterpret_cast<const uint8_t *>(uncompressed.data()),
            uncompressed.size());

        dataDecoder_->init(*in);
        dataStream_ = std::move(in);
#endif
    } else {
        compressed_.clear();
        uncompressed.clear();

        {
            z_stream zs;
            zs.zalloc = Z_NULL;
            zs.zfree = Z_NULL;
            zs.opaque = Z_NULL;
            zs.avail_in = 0;
            zs.next_in = Z_NULL;

            int ret = inflateInit2(&zs, /*windowBits=*/-15);
            if (ret != Z_OK) {
                throw Exception("Failed to initialize inflate, error: {}", ret);
            }

            const uint8_t *data;
            size_t len;
            while (ret != Z_STREAM_END && st->next(&data, &len)) {
                zs.avail_in = static_cast<uInt>(len);
                zs.next_in = const_cast<Bytef *>(data);
                do {
                    if (zs.total_out == uncompressed.size()) {
                        uncompressed.resize(uncompressed.size() + zlibBufGrowSize);
                    }
                    zs.avail_out = static_cast<uInt>(uncompressed.size() - zs.total_out);
                    zs.next_out = reinterpret_cast<Bytef *>(uncompressed.data() + zs.total_out);
                    ret = inflate(&zs, Z_NO_FLUSH);
                    if (ret == Z_STREAM_END) {
                        break;
                    }
                    if (ret != Z_OK) {
                        throw Exception("Failed to inflate, error: {}", ret);
                    }
                } while (zs.avail_out == 0);
            }

            uncompressed.resize(zs.total_out);
            (void) inflateEnd(&zs);
        }

        std::unique_ptr<InputStream> in = memoryInputStream(
            reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
            uncompressed.size());

        dataDecoder_->init(*in);
        dataStream_ = std::move(in);
    }
}