datahub/compress.go (188 lines of code) (raw):
package datahub
import (
"bytes"
"compress/zlib"
"io"
"github.com/pierrec/lz4"
"github.com/klauspost/compress/zstd"
)
// compress type
type CompressorType string
const (
NOCOMPRESS CompressorType = ""
LZ4 CompressorType = "lz4"
DEFLATE CompressorType = "deflate"
ZLIB CompressorType = "zlib" // Deprecated: Use DEFLATE instead.
ZSTD CompressorType = "zstd"
)
// validate that the type is valid
func validateCompressorType(ct CompressorType) bool {
switch ct {
case NOCOMPRESS, LZ4, DEFLATE, ZLIB, ZSTD:
return true
}
return false
}
func getCompressTypeFromValue(value int) CompressorType {
switch value {
case 0:
return NOCOMPRESS
case 1:
return DEFLATE
case 2:
return LZ4
case 3:
return ZLIB
case 4:
return ZSTD
default:
return NOCOMPRESS
}
}
func (ct *CompressorType) String() string {
return string(*ct)
}
func (ct *CompressorType) toValue() int {
switch *ct {
case NOCOMPRESS:
return 0
case DEFLATE:
return 1
case LZ4:
return 2
case ZLIB:
return 3
case ZSTD:
return 4
default:
return 0
}
}
// Compressor is a interface for the compress
type compressor interface {
Compress(data []byte) ([]byte, error)
DeCompress(data []byte, rawSize int64) ([]byte, error)
}
type lz4Compressor struct {
}
func (lc *lz4Compressor) Compress(data []byte) ([]byte, error) {
if len(data) == 0 {
return nil, nil
}
buf := make([]byte, lz4.CompressBlockBound(len(data)))
ht := make([]int, 64<<10)
n, err := lz4.CompressBlock(data, buf, ht)
if err != nil {
return nil, err
}
if n == 0 {
return data, nil
}
return buf[:n], nil
}
func (lc *lz4Compressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
// Allocated a very large buffer for decompression.
buf := make([]byte, rawSize)
_, err := lz4.UncompressBlock(data, buf)
if err != nil {
return nil, err
}
return buf, nil
}
type deflateCompressor struct {
}
func (dc *deflateCompressor) Compress(data []byte) ([]byte, error) {
var buf bytes.Buffer
w := zlib.NewWriter(&buf)
if _, err := w.Write(data); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (dc *deflateCompressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
b := bytes.NewReader(data)
var buf bytes.Buffer
r, _ := zlib.NewReader(b)
if _, err := io.Copy(&buf, r); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
type zlibCompressor struct {
}
func (zc *zlibCompressor) Compress(data []byte) ([]byte, error) {
var buf bytes.Buffer
w := zlib.NewWriter(&buf)
if _, err := w.Write(data); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (zc *zlibCompressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
b := bytes.NewReader(data)
var buf bytes.Buffer
r, _ := zlib.NewReader(b)
if _, err := io.Copy(&buf, r); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
type zstdCompressor struct {
}
func (zc *zstdCompressor) Compress(data []byte) ([]byte, error) {
var buf bytes.Buffer
writer, err := zstd.NewWriter(&buf)
if err != nil {
return nil, err
}
if _, err := writer.Write(data); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (zc *zstdCompressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
reader, err := zstd.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer reader.Close()
var buf bytes.Buffer
io.Copy(&buf, reader)
return buf.Bytes(), nil
}
var compressorMap map[CompressorType]compressor = map[CompressorType]compressor{
LZ4: &lz4Compressor{},
DEFLATE: &deflateCompressor{},
ZLIB: &zlibCompressor{},
ZSTD: &zstdCompressor{},
}
func newCompressor(c CompressorType) compressor {
switch CompressorType(c) {
case LZ4:
return &lz4Compressor{}
case DEFLATE:
return &deflateCompressor{}
case ZLIB:
return &zlibCompressor{}
case ZSTD:
return &zstdCompressor{}
default:
return nil
}
}
func getCompressor(c CompressorType) compressor {
if c == NOCOMPRESS {
return nil
}
ret, ok := compressorMap[c]
if !ok {
com := newCompressor(c)
compressorMap[c] = com
}
return ret
}