in lang/c++/impl/DataFile.cc [419:568]
void DataFileReaderBase::readDataBlock() {
decoder_->init(*stream_);
blockStart_ = stream_->byteCount();
const uint8_t *p = nullptr;
size_t n = 0;
if (!stream_->next(&p, &n)) {
eof_ = true;
return;
}
stream_->backup(n);
avro::decode(*decoder_, objectCount_);
int64_t byteCount;
avro::decode(*decoder_, byteCount);
decoder_->init(*stream_);
blockEnd_ = stream_->byteCount() + byteCount;
unique_ptr<InputStream> st = boundedInputStream(*stream_, static_cast<size_t>(byteCount));
if (codec_ == NULL_CODEC) {
dataDecoder_->init(*st);
dataStream_ = std::move(st);
#ifdef SNAPPY_CODEC_AVAILABLE
} else if (codec_ == SNAPPY_CODEC) {
uint32_t checksum = 0;
compressed_.clear();
uncompressed.clear();
const uint8_t *data;
size_t len;
while (st->next(&data, &len)) {
compressed_.insert(compressed_.end(), data, data + len);
}
len = compressed_.size();
if (len < 4)
throw Exception("Cannot read compressed data, expected at least 4 bytes, got " + std::to_string(len));
int b1 = compressed_[len - 4] & 0xFF;
int b2 = compressed_[len - 3] & 0xFF;
int b3 = compressed_[len - 2] & 0xFF;
int b4 = compressed_[len - 1] & 0xFF;
checksum = (b1 << 24) + (b2 << 16) + (b3 << 8) + (b4);
if (!snappy::Uncompress(reinterpret_cast<const char *>(compressed_.data()),
len - 4, &uncompressed)) {
throw Exception(
"Snappy Compression reported an error when decompressing");
}
auto c = crc32(0, reinterpret_cast<const Bytef *>(uncompressed.c_str()),
static_cast<uInt>(uncompressed.size()));
if (checksum != c) {
throw Exception(
"Checksum did not match for Snappy compression: Expected: {}, computed: {}",
checksum, c);
}
std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
uncompressed.size());
dataDecoder_->init(*in);
dataStream_ = std::move(in);
#endif
#ifdef ZSTD_CODEC_AVAILABLE
} else if (codec_ == ZSTD_CODEC) {
compressed_.clear();
const uint8_t *data;
size_t len;
while (st->next(&data, &len)) {
compressed_.insert(compressed_.end(), data, data + len);
}
// Get the decompressed size
size_t decompressed_size = ZSTD_getFrameContentSize(
reinterpret_cast<const char *>(compressed_.data()), compressed_.size());
if (decompressed_size == ZSTD_CONTENTSIZE_ERROR) {
throw Exception("ZSTD: Not a valid compressed frame");
} else if (decompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
throw Exception("ZSTD: Unable to determine decompressed size");
}
// Decompress the data
uncompressed.clear();
uncompressed.resize(decompressed_size);
size_t result = ZSTD_decompress(
uncompressed.data(), decompressed_size,
reinterpret_cast<const char *>(compressed_.data()), compressed_.size());
if (ZSTD_isError(result)) {
throw Exception("ZSTD decompression error: {}", ZSTD_getErrorName(result));
}
if (result != decompressed_size) {
throw Exception("ZSTD: Decompressed size mismatch: expected {}, got {}",
decompressed_size, result);
}
std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(uncompressed.data()),
uncompressed.size());
dataDecoder_->init(*in);
dataStream_ = std::move(in);
#endif
} else {
compressed_.clear();
uncompressed.clear();
{
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = 0;
zs.next_in = Z_NULL;
int ret = inflateInit2(&zs, /*windowBits=*/-15);
if (ret != Z_OK) {
throw Exception("Failed to initialize inflate, error: {}", ret);
}
const uint8_t *data;
size_t len;
while (ret != Z_STREAM_END && st->next(&data, &len)) {
zs.avail_in = static_cast<uInt>(len);
zs.next_in = const_cast<Bytef *>(data);
do {
if (zs.total_out == uncompressed.size()) {
uncompressed.resize(uncompressed.size() + zlibBufGrowSize);
}
zs.avail_out = static_cast<uInt>(uncompressed.size() - zs.total_out);
zs.next_out = reinterpret_cast<Bytef *>(uncompressed.data() + zs.total_out);
ret = inflate(&zs, Z_NO_FLUSH);
if (ret == Z_STREAM_END) {
break;
}
if (ret != Z_OK) {
throw Exception("Failed to inflate, error: {}", ret);
}
} while (zs.avail_out == 0);
}
uncompressed.resize(zs.total_out);
(void) inflateEnd(&zs);
}
std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
uncompressed.size());
dataDecoder_->init(*in);
dataStream_ = std::move(in);
}
}