func()

in banyand/stream/block_scanner.go [209:290]


func (bsn *blockScanner) scan(ctx context.Context, blockCh chan *blockScanResultBatch) {
	if len(bsn.parts) < 1 {
		return
	}
	var parts []*part
	if bsn.asc {
		parts = bsn.parts[0]
		bsn.parts = bsn.parts[1:]
	} else {
		parts = bsn.parts[len(bsn.parts)-1]
		bsn.parts = bsn.parts[:len(bsn.parts)-1]
	}
	bma := generateBlockMetadataArray()
	defer releaseBlockMetadataArray(bma)
	ti := generateTstIter()
	defer releaseTstIter(ti)
	ti.init(bma, parts, bsn.qo.sortedSids, bsn.qo.minTimestamp, bsn.qo.maxTimestamp)
	batch := generateBlockScanResultBatch()
	if ti.Error() != nil {
		batch.err = fmt.Errorf("cannot init tstIter: %w", ti.Error())
		select {
		case blockCh <- batch:
		case <-ctx.Done():
			releaseBlockScanResultBatch(batch)
			bsn.l.Warn().Err(ti.Error()).Msg("cannot init tstIter")
		}
		return
	}
	for ti.nextBlock() {
		p := ti.piHeap[0]
		batch.bss = append(batch.bss, blockScanResult{
			p: p.p,
		})
		bs := &batch.bss[len(batch.bss)-1]
		bs.qo.copyFrom(&bsn.qo)
		bs.qo.elementFilter = bsn.filterIndex[p.p.partMetadata.ID]
		bs.bm.copyFrom(p.curBlock)
		if len(batch.bss) >= cap(batch.bss) {
			var totalBlockBytes uint64
			for i := range batch.bss {
				totalBlockBytes += batch.bss[i].bm.uncompressedSizeBytes
			}
			if err := bsn.pm.AcquireResource(ctx, totalBlockBytes); err != nil {
				batch.err = fmt.Errorf("cannot acquire resource: %w", err)
				select {
				case blockCh <- batch:
				case <-ctx.Done():
					releaseBlockScanResultBatch(batch)
					bsn.l.Warn().Err(err).Msg("cannot acquire resource")
				}
				return
			}
			select {
			case blockCh <- batch:
			case <-ctx.Done():
				releaseBlockScanResultBatch(batch)
				bsn.l.Warn().Int("batch.len", len(batch.bss)).Msg("context canceled while sending block")
				return
			}
			batch = generateBlockScanResultBatch()
		}
	}
	if ti.Error() != nil {
		batch.err = fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
		select {
		case blockCh <- batch:
		case <-ctx.Done():

			releaseBlockScanResultBatch(batch)
		}
		return
	}
	if len(batch.bss) > 0 {
		select {
		case blockCh <- batch:
		case <-ctx.Done():
			releaseBlockScanResultBatch(batch)
		}
		return
	}
	releaseBlockScanResultBatch(batch)
}