banyand/stream/block_reader.go (185 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package stream
import (
"container/heap"
"errors"
"fmt"
"io"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/pool"
)
type seqReader struct {
sr fs.SeqReader
r fs.Reader
bytesRead uint64
}
func (sr *seqReader) reset() {
sr.r = nil
if sr.sr != nil {
fs.MustClose(sr.sr)
}
sr.sr = nil
sr.bytesRead = 0
}
func (sr *seqReader) Path() string {
return sr.r.Path()
}
func (sr *seqReader) init(r fs.Reader) {
sr.reset()
sr.sr = r.SequentialRead()
sr.r = r
}
func (sr *seqReader) mustReadFull(data []byte) {
n, err := io.ReadFull(sr.sr, data)
if err != nil {
if errors.Is(err, io.EOF) {
return
}
logger.Panicf("cannot read data: %v", err)
}
if n != len(data) {
logger.Panicf("cannot read full data: %d/%d", n, len(data))
}
sr.bytesRead += uint64(n)
}
func generateSeqReader() *seqReader {
if v := seqReaderPool.Get(); v != nil {
return v
}
return &seqReader{}
}
func releaseSeqReader(sr *seqReader) {
sr.reset()
seqReaderPool.Put(sr)
}
var seqReaderPool = pool.Register[*seqReader]("stream-seqReader")
type seqReaders struct {
tagFamilyMetadata map[string]*seqReader
tagFamilies map[string]*seqReader
primary seqReader
timestamps seqReader
}
func (sr *seqReaders) reset() {
sr.primary.reset()
sr.timestamps.reset()
if sr.tagFamilyMetadata != nil {
for k, r := range sr.tagFamilyMetadata {
releaseSeqReader(r)
delete(sr.tagFamilyMetadata, k)
}
}
if sr.tagFamilies != nil {
for k, r := range sr.tagFamilies {
releaseSeqReader(r)
delete(sr.tagFamilies, k)
}
}
}
func (sr *seqReaders) init(p *part) {
sr.reset()
sr.primary.init(p.primary)
sr.timestamps.init(p.timestamps)
if sr.tagFamilies == nil {
sr.tagFamilies = make(map[string]*seqReader)
sr.tagFamilyMetadata = make(map[string]*seqReader)
}
for k, r := range p.tagFamilies {
sr.tagFamilies[k] = generateSeqReader()
sr.tagFamilies[k].init(r)
sr.tagFamilyMetadata[k] = generateSeqReader()
sr.tagFamilyMetadata[k].init(p.tagFamilyMetadata[k])
}
}
type blockReader struct {
err error
block *blockPointer
pih partMergeIterHeap
nextBlockNoop bool
}
func (br *blockReader) reset() {
br.block = nil
for i := range br.pih {
br.pih[i] = nil
}
br.pih = br.pih[:0]
br.nextBlockNoop = false
br.err = nil
}
func (br *blockReader) init(pii []*partMergeIter) {
br.reset()
for _, pi := range pii {
if pi.nextBlockMetadata() {
br.pih = append(br.pih, pi)
continue
}
if err := pi.error(); err != nil {
br.err = fmt.Errorf("can't get the block to merge: %w", err)
return
}
}
if len(br.pih) == 0 {
br.err = io.EOF
return
}
heap.Init(&br.pih)
br.block = &br.pih[0].block
br.nextBlockNoop = true
}
func (br *blockReader) nextBlockMetadata() bool {
if br.err != nil {
return false
}
if br.nextBlockNoop {
br.nextBlockNoop = false
return true
}
br.err = br.nextMetadata()
if br.err != nil {
if errors.Is(br.err, io.EOF) {
return false
}
br.err = fmt.Errorf("can't get the block to merge: %w", br.err)
return false
}
return true
}
func (br *blockReader) nextMetadata() error {
head := br.pih[0]
if head.nextBlockMetadata() {
heap.Fix(&br.pih, 0)
br.block = &br.pih[0].block
return nil
}
if err := head.error(); err != nil {
br.block = nil
return err
}
heap.Pop(&br.pih)
if len(br.pih) == 0 {
br.block = nil
return io.EOF
}
br.block = &br.pih[0].block
return nil
}
func (br *blockReader) loadBlockData(decoder *encoding.BytesBlockDecoder) {
br.pih[0].mustLoadBlockData(decoder, br.block)
}
func (br *blockReader) error() error {
if errors.Is(br.err, io.EOF) {
return nil
}
return br.err
}
var blockReaderPool = pool.Register[*blockReader]("stream-blockReader")
func generateBlockReader() *blockReader {
if v := blockReaderPool.Get(); v != nil {
return v
}
return &blockReader{}
}
func releaseBlockReader(br *blockReader) {
br.reset()
blockReaderPool.Put(br)
}