modules/compress.go (215 lines of code) (raw):
package module
import (
"bytes"
"compress/flate"
"compress/gzip"
"compress/zlib"
"errors"
"io/ioutil"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
"github.com/alibaba/MongoShake/v2/tunnel"
utils "github.com/alibaba/MongoShake/v2/common"
LOG "github.com/vinllen/log4go"
)
const (
NoCompress uint32 = 0
CompressWithGzip uint32 = 1
CompressWithSnappy uint32 = 2
CompressWithZlib uint32 = 3
CompressWithDeflate uint32 = 4
)
const (
BestSpeed = flate.BestSpeed
BestCompression = flate.BestCompression
NormalCompression = flate.DefaultCompression
)
var CompressLevel = BestCompression
type Compress interface {
Name() string
Id() uint32
Compress(chunk []byte) ([]byte, error)
Decompress(compressed []byte) ([]byte, error)
}
func GetCompressorByName(name string) (Compress, error) {
switch name {
case utils.VarIncrSyncWorkerOplogCompressorGzip:
return compressorGzip, nil
case utils.VarIncrSyncWorkerOplogCompressorSnappy:
return compressorSnappy, nil
case utils.VarIncrSyncWorkerOplogCompressorZlib:
return compressorZlib, nil
case utils.VarIncrSyncWorkerOplogCompressorDeflate:
return compressorDeflate, nil
case utils.VarIncrSyncWorkerOplogCompressorNone:
fallthrough
default:
return nil, errors.New("invalid compressor name")
}
}
func GetCompressorById(id uint32) (Compress, error) {
switch id {
case CompressWithGzip:
return compressorGzip, nil
case CompressWithSnappy:
return compressorSnappy, nil
case CompressWithZlib:
return compressorZlib, nil
case CompressWithDeflate:
return compressorDeflate, nil
case NoCompress:
fallthrough
default:
return nil, errors.New("invalid compressor id")
}
}
/*
* ====== Compressor =======
*
*/
type Compressor struct {
// compressor nil if compress is not enable
zipper Compress
}
func (compressor *Compressor) IsRegistered() bool {
return conf.Options.IncrSyncWorkerOplogCompressor != utils.VarIncrSyncWorkerOplogCompressorNone
}
func (compressor *Compressor) Install() bool {
var err error
if compressor.zipper, err = GetCompressorByName(conf.Options.IncrSyncWorkerOplogCompressor); err != nil {
LOG.Critical("Worker create compressor %s failed", conf.Options.IncrSyncWorkerOplogCompressor)
return false
}
// use high compress ratio by default
CompressLevel = BestCompression
return true
}
func (compressor *Compressor) Handle(message *tunnel.WMessage) int64 {
var originSize, compressedSize = 0, 0
// compress log entry data
if len(message.RawLogs) != 0 {
compressed := [][]byte{}
// every log entry compress
for _, log := range message.RawLogs {
originSize += len(log)
zipped, err := compressor.zipper.Compress(log)
if err == nil {
compressedSize += len(zipped)
compressed = append(compressed, zipped)
}
}
if compressedSize == 0 || len(compressed) != len(message.RawLogs) {
LOG.Critical("Compressor result isn't equivalent. len(compressed) %d, len(Logs) %d", len(compressed), len(message.RawLogs))
return tunnel.ReplyServerFault
}
LOG.Debug("Compressor-%s condense raw_size(%d), compress_size(%d), compress_ratio %d%%", compressor.zipper.Name(),
originSize, compressedSize, compressedSize*100/originSize)
message.Compress = compressor.zipper.Id()
message.RawLogs = compressed
} else {
message.Compress = NoCompress
}
return tunnel.ReplyOK
}
type Writable struct {
buffer *bytes.Buffer
}
type GZip struct {
Writable
}
func NewGZipCompressor() *GZip {
compressor := &GZip{Writable: Writable{buffer: new(bytes.Buffer)}}
return compressor
}
func (Gzip *GZip) Name() string {
return utils.VarIncrSyncWorkerOplogCompressorGzip
}
func (Gzip *GZip) Id() uint32 {
return CompressWithGzip
}
func (Gzip *GZip) Compress(chunk []byte) ([]byte, error) {
buffer := new(bytes.Buffer)
w, _ := gzip.NewWriterLevel(buffer, CompressLevel)
w.Write(chunk)
w.Close()
if compressed, err := ioutil.ReadAll(buffer); err == nil {
return compressed, nil
}
return nil, errors.New("GZip compress failed")
}
func (Gzip *GZip) Decompress(compressed []byte) ([]byte, error) {
r, _ := gzip.NewReader(bytes.NewBuffer(compressed))
if uncompress, err := ioutil.ReadAll(r); err == nil {
return uncompress, nil
}
return nil, errors.New("GZip uncompress failed")
}
type Snappy struct {
Writable
}
func NewSnappyCompressor() *Snappy {
return &Snappy{}
}
func (snappy *Snappy) Name() string {
return utils.VarIncrSyncWorkerOplogCompressorSnappy
}
func (snappy *Snappy) Id() uint32 {
return CompressWithSnappy
}
func (snappy *Snappy) Compress(chunk []byte) ([]byte, error) {
return chunk, nil
}
func (snappy *Snappy) Decompress(compressed []byte) ([]byte, error) {
return compressed, nil
}
type Zlib struct {
Writable
}
func NewZlibCompressor() *Zlib {
compressor := &Zlib{Writable: Writable{buffer: new(bytes.Buffer)}}
return compressor
}
func (zlib *Zlib) Name() string {
return utils.VarIncrSyncWorkerOplogCompressorZlib
}
func (zlib *Zlib) Id() uint32 {
return CompressWithZlib
}
func (ZLIB *Zlib) Compress(chunk []byte) ([]byte, error) {
buffer := new(bytes.Buffer)
w, _ := zlib.NewWriterLevel(buffer, CompressLevel)
w.Write(chunk)
w.Close()
if compressed, err := ioutil.ReadAll(buffer); err == nil {
return compressed, nil
}
return nil, errors.New("Zlib compress failed")
}
func (ZLIB *Zlib) Decompress(compressed []byte) ([]byte, error) {
r, _ := zlib.NewReader(bytes.NewBuffer(compressed))
if uncompress, err := ioutil.ReadAll(r); err == nil {
return uncompress, nil
}
return nil, errors.New("Zlib uncompress failed")
}
type Deflate struct {
Writable
}
func NewDeflateCompressor() *Deflate {
compressor := &Deflate{Writable: Writable{buffer: new(bytes.Buffer)}}
return compressor
}
func (deflate *Deflate) Name() string {
return utils.VarIncrSyncWorkerOplogCompressorDeflate
}
func (deflate *Deflate) Id() uint32 {
return CompressWithDeflate
}
func (deflate *Deflate) Compress(chunk []byte) ([]byte, error) {
buffer := new(bytes.Buffer)
w, _ := flate.NewWriter(buffer, CompressLevel)
w.Write(chunk)
w.Close()
if compressed, err := ioutil.ReadAll(buffer); err == nil {
return compressed, nil
}
return nil, errors.New("deflate compress failed")
}
func (deflate *Deflate) Decompress(compressed []byte) ([]byte, error) {
r := flate.NewReader(bytes.NewBuffer(compressed))
if uncompress, err := ioutil.ReadAll(r); err == nil {
return uncompress, nil
}
return nil, errors.New("deflate uncompressed failed")
}