shed/async_compression/src/compressor.rs (93 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under both the MIT license found in the
* LICENSE-MIT file in the root directory of this source tree and the Apache
* License, Version 2.0 found in the LICENSE-APACHE file in the root directory
* of this source tree.
*/
//! Non-blocking, buffered compression.
use std::fmt::{self, Debug, Formatter};
use std::io::{self, Write};
use std::result;
use bzip2::write::BzEncoder;
use flate2::write::GzEncoder;
use futures::Poll;
use tokio_io::AsyncWrite;
use crate::decompressor::DecompressorType;
use crate::raw::{AsyncZstdEncoder, RawEncoder};
use crate::retry::retry_write;
/// Defines the supported compression types
#[derive(Clone, Copy, Debug)]
pub enum CompressorType {
/// The [bzip2] compression with configs
Bzip2(bzip2::Compression),
/// The [flate2] compression with configs
Gzip(flate2::Compression),
/// The [zstd] compression
Zstd {
/// compression level, see [zstd::Encoder::new]
level: i32,
},
}
impl CompressorType {
/// Returns the matching decompression type for this compression type
pub fn decompressor_type(self) -> DecompressorType {
match self {
CompressorType::Bzip2(_) => DecompressorType::Bzip2,
CompressorType::Gzip(_) => DecompressorType::Gzip,
CompressorType::Zstd { .. } => DecompressorType::OverreadingZstd,
}
}
}
/// A wrapper around various compression libraries that compresses the data
/// passed to it via the [Write] trait invocations and writes it further to the
/// provided writer. It implements [AsyncWrite].
pub struct Compressor<W>
where
W: AsyncWrite + 'static,
{
c_type: CompressorType,
inner: Box<dyn RawEncoder<W> + Send>,
}
impl<W> Compressor<W>
where
W: AsyncWrite + Send + 'static,
{
/// Creates and instance of [Compressor] that will use the provided
/// [CompressorType] for compression and pass the result to the provided
/// [Write]r
pub fn new(w: W, ct: CompressorType) -> Self {
Compressor {
c_type: ct,
inner: match ct {
CompressorType::Bzip2(level) => Box::new(BzEncoder::new(w, level)),
CompressorType::Gzip(level) => Box::new(GzEncoder::new(w, level)),
CompressorType::Zstd { level } => Box::new(AsyncZstdEncoder::new(w, level)),
},
}
}
/// You need to finish the stream when you're done writing. This method
/// calls the inner compression instance to finish the compression in their
/// own way so that it might write the final data to inner [Write]r
pub fn try_finish(self) -> result::Result<W, (Self, io::Error)> {
match self.inner.try_finish() {
Ok(writer) => Ok(writer),
Err((encoder, e)) => Err((
Compressor {
c_type: self.c_type,
inner: encoder,
},
e,
)),
}
}
}
impl<W> Write for Compressor<W>
where
W: AsyncWrite + Send,
{
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
retry_write(self.inner.by_ref(), buf)
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl<W> AsyncWrite for Compressor<W>
where
W: AsyncWrite + Send,
{
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.inner.shutdown()
}
}
impl<W: AsyncWrite> Debug for Compressor<W> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Compressor")
.field("c_type", &self.c_type)
.finish()
}
}
/// Ensure that compressors implement Send.
fn _assert_send() {
use std::io::Cursor;
fn _assert<T: Send>(_val: T) {}
_assert(Compressor::new(
Cursor::new(Vec::new()),
CompressorType::Bzip2(bzip2::Compression::Default),
));
}