func mergeBlocks()

in banyand/measure/merger.go [269:357]


func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*partMetadata, error) {
	pendingBlockIsEmpty := true
	pendingBlock := generateBlockPointer()
	defer releaseBlockPointer(pendingBlock)
	var tmpBlock, tmpBlock2 *blockPointer
	var decoder *encoding.BytesBlockDecoder
	getDecoder := func() *encoding.BytesBlockDecoder {
		if decoder == nil {
			decoder = generateColumnValuesDecoder()
		}
		return decoder
	}
	releaseDecoder := func() {
		if decoder != nil {
			releaseColumnValuesDecoder(decoder)
			decoder = nil
		}
	}
	for br.nextBlockMetadata() {
		select {
		case <-closeCh:
			return nil, errClosed
		default:
		}
		b := br.block

		if pendingBlockIsEmpty {
			br.loadBlockData(getDecoder())
			pendingBlock.copyFrom(b)
			pendingBlockIsEmpty = false
			continue
		}

		if pendingBlock.bm.seriesID != b.bm.seriesID ||
			(pendingBlock.isFull() && pendingBlock.bm.timestamps.max <= b.bm.timestamps.min) {
			bw.mustWriteBlock(pendingBlock.bm.seriesID, &pendingBlock.block)
			releaseDecoder()
			br.loadBlockData(getDecoder())
			pendingBlock.copyFrom(b)
			continue
		}

		if tmpBlock == nil {
			tmpBlock = generateBlockPointer()
			defer releaseBlockPointer(tmpBlock)
		}
		tmpBlock.reset()
		tmpBlock.bm.seriesID = b.bm.seriesID
		br.loadBlockData(getDecoder())
		mergeTwoBlocks(tmpBlock, pendingBlock, b)
		if len(tmpBlock.timestamps) <= maxBlockLength && tmpBlock.uncompressedSizeBytes() <= maxUncompressedBlockSize {
			if len(tmpBlock.timestamps) == 0 {
				pendingBlockIsEmpty = true
			}
			pendingBlock, tmpBlock = tmpBlock, pendingBlock
			continue
		}

		if len(tmpBlock.timestamps) <= maxBlockLength {
			bw.mustWriteBlock(tmpBlock.bm.seriesID, &tmpBlock.block)
			pendingBlock.reset()
			pendingBlockIsEmpty = true
			releaseDecoder()
			continue
		}
		tmpBlock.idx = maxBlockLength
		pendingBlock.copyFrom(tmpBlock)
		l := tmpBlock.idx
		tmpBlock.idx = 0
		if tmpBlock2 == nil {
			tmpBlock2 = generateBlockPointer()
			defer releaseBlockPointer(tmpBlock2)
		}
		tmpBlock2.reset()
		tmpBlock2.append(tmpBlock, l)
		bw.mustWriteBlock(tmpBlock.bm.seriesID, &tmpBlock2.block)
		releaseDecoder()
	}
	if err := br.error(); err != nil {
		return nil, fmt.Errorf("cannot read block to merge: %w", err)
	}
	if !pendingBlockIsEmpty {
		bw.mustWriteBlock(pendingBlock.bm.seriesID, &pendingBlock.block)
	}
	releaseDecoder()
	var result partMetadata
	bw.Flush(&result)
	return &result, nil
}