pkg/encoding/encoder.go (236 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package encoding import ( "bytes" "encoding/binary" "io" "sync" "time" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/pkg/convert" ) var ( encoderPool = sync.Pool{ New: newEncoder, } decoderPool = sync.Pool{ New: func() interface{} { return &decoder{} }, } errInvalidValue = errors.New("invalid encoded value") errNoData = errors.New("there is no data") ) type encoderPoolDelegator struct { pool *sync.Pool fn ParseInterval name string size int } // NewEncoderPool returns a SeriesEncoderPool which provides int-based xor encoders. func NewEncoderPool(name string, size int, fn ParseInterval) SeriesEncoderPool { return &encoderPoolDelegator{ name: name, pool: &encoderPool, size: size, fn: fn, } } func (b *encoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) SeriesEncoder { encoder := b.pool.Get().(*encoder) encoder.name = b.name encoder.size = b.size encoder.fn = b.fn encoder.Reset(metadata, buffer) return encoder } func (b *encoderPoolDelegator) Put(seriesEncoder SeriesEncoder) { _, ok := seriesEncoder.(*encoder) if ok { b.pool.Put(seriesEncoder) } } type decoderPoolDelegator struct { pool *sync.Pool fn ParseInterval name string size int } // NewDecoderPool returns a SeriesDecoderPool which provides int-based xor decoders. func NewDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool { return &decoderPoolDelegator{ name: name, pool: &decoderPool, size: size, fn: fn, } } func (b *decoderPoolDelegator) Get(_ []byte) SeriesDecoder { decoder := b.pool.Get().(*decoder) decoder.name = b.name decoder.size = b.size decoder.fn = b.fn return decoder } func (b *decoderPoolDelegator) Put(seriesDecoder SeriesDecoder) { _, ok := seriesDecoder.(*decoder) if ok { b.pool.Put(seriesDecoder) } } var _ SeriesEncoder = (*encoder)(nil) // ParseInterval parses the interval rule from the key in a kv pair. type ParseInterval = func(key []byte) time.Duration type encoder struct { buff BufferWriter bw *Writer values *XOREncoder fn ParseInterval name string interval time.Duration startTime uint64 prevTime uint64 num int size int } func newEncoder() interface{} { bw := NewWriter() return &encoder{ bw: bw, values: NewXOREncoder(bw), } } func (ie *encoder) Append(ts uint64, value []byte) { if len(value) > 8 { return } if ie.startTime == 0 { ie.startTime = ts ie.prevTime = ts } else if ie.startTime > ts { ie.startTime = ts } gap := int(ie.prevTime) - int(ts) if gap < 0 { return } zeroNum := gap/int(ie.interval) - 1 for i := 0; i < zeroNum; i++ { ie.bw.WriteBool(false) ie.num++ } ie.prevTime = ts l := len(value) ie.bw.WriteBool(l > 0) ie.values.Write(convert.BytesToUint64(value)) ie.num++ } func (ie *encoder) IsFull() bool { return ie.num >= ie.size } func (ie *encoder) Reset(key []byte, buffer BufferWriter) { ie.buff = buffer ie.bw.Reset(buffer) ie.interval = ie.fn(key) ie.startTime = 0 ie.prevTime = 0 ie.num = 0 ie.values = NewXOREncoder(ie.bw) } func (ie *encoder) Encode() error { ie.bw.Flush() buffWriter := NewPacker(ie.buff) buffWriter.PutUint64(ie.startTime) buffWriter.PutUint16(uint16(ie.num)) return nil } func (ie *encoder) StartTime() uint64 { return ie.startTime } var _ SeriesDecoder = (*decoder)(nil) type decoder struct { fn ParseInterval name string area []byte size int interval time.Duration startTime uint64 num int } func (i *decoder) Decode(key, data []byte) error { if len(data) < 10 { return errInvalidValue } i.interval = i.fn(key) i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2]) i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:])) i.area = data[:len(data)-10] return nil } func (i decoder) Len() int { return i.num } func (i decoder) IsFull() bool { return i.num >= i.size } func (i decoder) Get(ts uint64) ([]byte, error) { for iter := i.Iterator(); iter.Next(); { if iter.Time() == ts { return iter.Val(), nil } } return nil, errors.WithMessagef(errNoData, "ts:%d", ts) } func (i decoder) Range() (start, end uint64) { return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval) } func (i decoder) Iterator() SeriesIterator { br := NewReader(bytes.NewReader(i.area)) return &intIterator{ endTime: i.startTime + uint64(i.num*int(i.interval)), interval: int(i.interval), br: br, values: NewXORDecoder(br), size: i.num, } } var _ SeriesIterator = (*intIterator)(nil) type intIterator struct { err error br *Reader values *XORDecoder endTime uint64 interval int size int currVal uint64 currTime uint64 index int } func (i *intIterator) Next() bool { if i.index >= i.size { return false } var b bool var err error for !b { b, err = i.br.ReadBool() if errors.Is(err, io.EOF) { return false } if err != nil { i.err = err return false } i.index++ i.currTime = i.endTime - uint64(i.interval*i.index) } if i.values.Next() { i.currVal = i.values.Value() } return true } func (i *intIterator) Val() []byte { return convert.Uint64ToBytes(i.currVal) } func (i *intIterator) Time() uint64 { return i.currTime } func (i *intIterator) Error() error { return i.err }