shed/async_compression/src/decompressor.rs (78 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under both the MIT license found in the
* LICENSE-MIT file in the root directory of this source tree and the Apache
* License, Version 2.0 found in the LICENSE-APACHE file in the root directory
* of this source tree.
*/
//! Non-blocking, buffered compression and decompression
use std::fmt::{self, Debug, Formatter};
use std::io::{self, BufRead, Read};
use bzip2::bufread::BzDecoder;
use flate2::bufread::GzDecoder;
use tokio_io::AsyncRead;
use zstd::Decoder as ZstdDecoder;
use crate::raw::RawDecoder;
/// A wrapper around various compression libraries that decompresses from the
/// inner [Read]er and exposes a [Read] trait API of its own. It implements
/// [AsyncRead].
pub struct Decompressor<'a, R>
where
R: AsyncRead + BufRead + 'a + Send,
{
d_type: DecompressorType,
inner: Box<dyn RawDecoder<R> + 'a + Send>,
}
/// Defines the supported decompression types
#[derive(Clone, Copy, Debug)]
pub enum DecompressorType {
/// The [bzip2] decompression
Bzip2,
/// The [flate2] decompression
Gzip,
/// The [zstd] decompression
///
/// The Zstd Decoder is overreading it's input. Consider this situation: you have a Reader that
/// returns parts of it's data compressed with Zstd and the remainder decompressed. Gzip and
/// Bzip2 will consume only the compressed bytes leaving the remainder untouched. The Zstd
/// though will consume some of the decomressed bytes, so that once you call `::into_inner()`
/// on it, the returned Reader will not contain the decomressed bytes.
///
/// Advice: use only if the entire Reader content needs to be decompressed
/// You have been warned
OverreadingZstd,
}
impl<'a, R> Decompressor<'a, R>
where
R: AsyncRead + BufRead + 'a + Send,
{
/// Creates and instance of [Decompressor] that will use the provided
/// [DecompressorType] for decompression of data read from the provided
/// [Read]er
pub fn new(r: R, dt: DecompressorType) -> Self {
Decompressor {
d_type: dt,
inner: match dt {
DecompressorType::Bzip2 => Box::new(BzDecoder::new(r)),
DecompressorType::Gzip => Box::new(GzDecoder::new(r)),
DecompressorType::OverreadingZstd => Box::new(
ZstdDecoder::with_buffer(r).expect("ZstdDecoder failed to create. Are we OOM?"),
),
},
}
}
/// Get a reference to the inner decompressor
#[inline]
pub fn get_ref(&self) -> &R {
self.inner.get_ref()
}
/// Get a mutable reference to the inner decompressor
#[inline]
pub fn get_mut(&mut self) -> &mut R {
self.inner.get_mut()
}
/// Returns the inner decompressor consuming this structure
#[inline]
pub fn into_inner(self) -> R {
self.inner.into_inner()
}
}
impl<'a, R: AsyncRead + BufRead + 'a + Send> Read for Decompressor<'a, R> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl<'a, R: AsyncRead + BufRead + 'a + Send> AsyncRead for Decompressor<'a, R> {}
impl<'a, R: AsyncRead + BufRead + 'a + Send> Debug for Decompressor<'a, R> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Decompressor")
.field("decoder_type", &self.d_type)
.finish()
}
}