pkg/encoding/bytes.go (239 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package encoding import ( "fmt" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" ) // EncodeBytes encodes a string into dst. func EncodeBytes(dst, b []byte) []byte { dst = VarUint64ToBytes(dst, uint64(len(b))) dst = append(dst, b...) return dst } // DecodeBytes decodes a string from src. func DecodeBytes(src []byte) ([]byte, []byte, error) { var n uint64 src, n = BytesToVarUint64(src) if uint64(len(src)) < n { return nil, nil, fmt.Errorf("src is too short for reading string with size %d; len(src)=%d", n, len(src)) } return src[n:], src[:n], nil } // EncodeBytesBlock encodes a block of strings into dst. func EncodeBytesBlock(dst []byte, a [][]byte) []byte { u64s := GenerateUint64List(len(a)) aLens := u64s.L[:0] for _, s := range a { aLens = append(aLens, uint64(len(s))) } u64s.L = aLens dst = encodeUint64Block(dst, u64s.L) ReleaseUint64List(u64s) bb := bbPool.Generate() b := bb.Buf for _, s := range a { b = append(b, s...) } bb.Buf = b dst = compressBlock(dst, bb.Buf) bbPool.Release(bb) return dst } // BytesBlockDecoder decodes a block of strings from src. // It reuses the underlying buffer for storing the decoded strings. type BytesBlockDecoder struct { data []byte } // Reset resets the decoder. func (bbd *BytesBlockDecoder) Reset() { bbd.data = bbd.data[:0] } // Decode decodes a block of strings from src. func (bbd *BytesBlockDecoder) Decode(dst [][]byte, src []byte, itemsCount uint64) ([][]byte, error) { u64List := GenerateUint64List(0) defer ReleaseUint64List(u64List) var tail []byte var err error u64List.L, tail, err = decodeUint64Block(u64List.L[:0], src, itemsCount) if err != nil { return dst, fmt.Errorf("cannot decode string lengths: %w", err) } aLens := u64List.L src = tail dataLen := len(bbd.data) bbd.data, tail, err = decompressBlock(bbd.data, src) if err != nil { return dst, fmt.Errorf("cannot decode bytes block with strings: %w", err) } if len(tail) > 0 { return dst, fmt.Errorf("unexpected non-empty tail after reading bytes block with strings; len(tail)=%d", len(tail)) } data := bbd.data[dataLen:] for _, sLen := range aLens { if uint64(len(data)) < sLen { return dst, fmt.Errorf("cannot decode a string with the length %d bytes from %d bytes", sLen, len(data)) } if sLen == 0 { dst = append(dst, nil) continue } dst = append(dst, data[:sLen]) data = data[sLen:] } return dst, nil } func encodeUint64Block(dst []byte, a []uint64) []byte { bb := bbPool.Generate() bb.Buf = encodeUint64List(bb.Buf[:0], a) dst = compressBlock(dst, bb.Buf) bbPool.Release(bb) return dst } func decodeUint64Block(dst []uint64, src []byte, itemsCount uint64) ([]uint64, []byte, error) { bb := bbPool.Generate() defer bbPool.Release(bb) var err error bb.Buf, src, err = decompressBlock(bb.Buf[:0], src) if err != nil { return dst, src, fmt.Errorf("cannot decode bytes block: %w", err) } dst, err = decodeUint64List(dst, bb.Buf, itemsCount) if err != nil { return dst, src, fmt.Errorf("cannot decode %d uint64 items from bytes block of length %d bytes: %w", itemsCount, len(bb.Buf), err) } return dst, src, nil } const ( uintBlockType8 = 0 uintBlockType16 = 1 uintBlockType32 = 2 uintBlockType64 = 3 ) func encodeUint64List(dst []byte, a []uint64) []byte { nMax := uint64(0) for _, n := range a { if n > nMax { nMax = n } } switch { case nMax < (1 << 8): dst = append(dst, uintBlockType8) for _, n := range a { dst = append(dst, byte(n)) } case nMax < (1 << 16): dst = append(dst, uintBlockType16) for _, n := range a { dst = Uint16ToBytes(dst, uint16(n)) } case nMax < (1 << 32): dst = append(dst, uintBlockType32) for _, n := range a { dst = Uint32ToBytes(dst, uint32(n)) } default: dst = append(dst, uintBlockType64) for _, n := range a { dst = Uint64ToBytes(dst, n) } } return dst } func decodeUint64List(dst []uint64, src []byte, itemsCount uint64) ([]uint64, error) { if len(src) < 1 { return dst, fmt.Errorf("cannot decode uint64 block type from empty src") } blockType := src[0] src = src[1:] switch blockType { case uintBlockType8: if uint64(len(src)) != itemsCount { return dst, fmt.Errorf("unexpected block length for %d items; got %d bytes; want %d bytes", itemsCount, len(src), itemsCount) } for _, v := range src { dst = append(dst, uint64(v)) } case uintBlockType16: if uint64(len(src)) != 2*itemsCount { return dst, fmt.Errorf("unexpected block length for %d items; got %d bytes; want %d bytes", itemsCount, len(src), 2*itemsCount) } for len(src) > 0 { v := BytesToUint16(src) src = src[2:] dst = append(dst, uint64(v)) } case uintBlockType32: if uint64(len(src)) != 4*itemsCount { return dst, fmt.Errorf("unexpected block length for %d items; got %d bytes; want %d bytes", itemsCount, len(src), 4*itemsCount) } for len(src) > 0 { v := BytesToUint32(src) src = src[4:] dst = append(dst, uint64(v)) } case uintBlockType64: if uint64(len(src)) != 8*itemsCount { return dst, fmt.Errorf("unexpected block length for %d items; got %d bytes; want %d bytes", itemsCount, len(src), 8*itemsCount) } for len(src) > 0 { v := BytesToUint64(src) src = src[8:] dst = append(dst, v) } default: return dst, fmt.Errorf("unexpected uint64 block type: %d; want 0, 1, 2 or 3", blockType) } return dst, nil } const ( compressTypePlain = 0 compressTypeZSTD = 1 ) func compressBlock(dst, src []byte) []byte { if len(src) < 128 { dst = append(dst, compressTypePlain, byte(len(src))) return append(dst, src...) } dst = append(dst, compressTypeZSTD) bb := bbPool.Generate() bb.Buf = zstd.Compress(bb.Buf[:0], src, 1) dst = VarUint64ToBytes(dst, uint64(len(bb.Buf))) dst = append(dst, bb.Buf...) bbPool.Release(bb) return dst } func decompressBlock(dst, src []byte) ([]byte, []byte, error) { if len(src) < 1 { return dst, src, fmt.Errorf("cannot decode block type from empty src") } blockType := src[0] src = src[1:] switch blockType { case compressTypePlain: if len(src) < 1 { return dst, src, fmt.Errorf("cannot decode plain block size from empty src") } blockLen := int(src[0]) src = src[1:] if len(src) < blockLen { return dst, src, fmt.Errorf("cannot read plain block with the size %d bytes from %b bytes", blockLen, len(src)) } dst = append(dst, src[:blockLen]...) src = src[blockLen:] return dst, src, nil case compressTypeZSTD: tail, blockLen := BytesToVarUint64(src) src = tail if uint64(len(src)) < blockLen { return dst, src, fmt.Errorf("cannot read compressed block with the size %d bytes from %d bytes", blockLen, len(src)) } compressedBlock := src[:blockLen] src = src[blockLen:] // Decompress the block var err error bb := bbPool.Generate() bb.Buf, err = zstd.Decompress(bb.Buf[:0], compressedBlock) if err != nil { return dst, src, fmt.Errorf("cannot decompress block: %w", err) } // Copy the decompressed block to dst. dst = append(dst, bb.Buf...) bbPool.Release(bb) return dst, src, nil default: return dst, src, fmt.Errorf("unexpected block type: %d; supported types: 0, 1", blockType) } } var bbPool = bytes.NewBufferPool("encoding.bytesBlock")