in banyand/measure/merger.go [269:357]
func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*partMetadata, error) {
pendingBlockIsEmpty := true
pendingBlock := generateBlockPointer()
defer releaseBlockPointer(pendingBlock)
var tmpBlock, tmpBlock2 *blockPointer
var decoder *encoding.BytesBlockDecoder
getDecoder := func() *encoding.BytesBlockDecoder {
if decoder == nil {
decoder = generateColumnValuesDecoder()
}
return decoder
}
releaseDecoder := func() {
if decoder != nil {
releaseColumnValuesDecoder(decoder)
decoder = nil
}
}
for br.nextBlockMetadata() {
select {
case <-closeCh:
return nil, errClosed
default:
}
b := br.block
if pendingBlockIsEmpty {
br.loadBlockData(getDecoder())
pendingBlock.copyFrom(b)
pendingBlockIsEmpty = false
continue
}
if pendingBlock.bm.seriesID != b.bm.seriesID ||
(pendingBlock.isFull() && pendingBlock.bm.timestamps.max <= b.bm.timestamps.min) {
bw.mustWriteBlock(pendingBlock.bm.seriesID, &pendingBlock.block)
releaseDecoder()
br.loadBlockData(getDecoder())
pendingBlock.copyFrom(b)
continue
}
if tmpBlock == nil {
tmpBlock = generateBlockPointer()
defer releaseBlockPointer(tmpBlock)
}
tmpBlock.reset()
tmpBlock.bm.seriesID = b.bm.seriesID
br.loadBlockData(getDecoder())
mergeTwoBlocks(tmpBlock, pendingBlock, b)
if len(tmpBlock.timestamps) <= maxBlockLength && tmpBlock.uncompressedSizeBytes() <= maxUncompressedBlockSize {
if len(tmpBlock.timestamps) == 0 {
pendingBlockIsEmpty = true
}
pendingBlock, tmpBlock = tmpBlock, pendingBlock
continue
}
if len(tmpBlock.timestamps) <= maxBlockLength {
bw.mustWriteBlock(tmpBlock.bm.seriesID, &tmpBlock.block)
pendingBlock.reset()
pendingBlockIsEmpty = true
releaseDecoder()
continue
}
tmpBlock.idx = maxBlockLength
pendingBlock.copyFrom(tmpBlock)
l := tmpBlock.idx
tmpBlock.idx = 0
if tmpBlock2 == nil {
tmpBlock2 = generateBlockPointer()
defer releaseBlockPointer(tmpBlock2)
}
tmpBlock2.reset()
tmpBlock2.append(tmpBlock, l)
bw.mustWriteBlock(tmpBlock.bm.seriesID, &tmpBlock2.block)
releaseDecoder()
}
if err := br.error(); err != nil {
return nil, fmt.Errorf("cannot read block to merge: %w", err)
}
if !pendingBlockIsEmpty {
bw.mustWriteBlock(pendingBlock.bm.seriesID, &pendingBlock.block)
}
releaseDecoder()
var result partMetadata
bw.Flush(&result)
return &result, nil
}