in banyand/stream/block_scanner.go [209:290]
func (bsn *blockScanner) scan(ctx context.Context, blockCh chan *blockScanResultBatch) {
if len(bsn.parts) < 1 {
return
}
var parts []*part
if bsn.asc {
parts = bsn.parts[0]
bsn.parts = bsn.parts[1:]
} else {
parts = bsn.parts[len(bsn.parts)-1]
bsn.parts = bsn.parts[:len(bsn.parts)-1]
}
bma := generateBlockMetadataArray()
defer releaseBlockMetadataArray(bma)
ti := generateTstIter()
defer releaseTstIter(ti)
ti.init(bma, parts, bsn.qo.sortedSids, bsn.qo.minTimestamp, bsn.qo.maxTimestamp)
batch := generateBlockScanResultBatch()
if ti.Error() != nil {
batch.err = fmt.Errorf("cannot init tstIter: %w", ti.Error())
select {
case blockCh <- batch:
case <-ctx.Done():
releaseBlockScanResultBatch(batch)
bsn.l.Warn().Err(ti.Error()).Msg("cannot init tstIter")
}
return
}
for ti.nextBlock() {
p := ti.piHeap[0]
batch.bss = append(batch.bss, blockScanResult{
p: p.p,
})
bs := &batch.bss[len(batch.bss)-1]
bs.qo.copyFrom(&bsn.qo)
bs.qo.elementFilter = bsn.filterIndex[p.p.partMetadata.ID]
bs.bm.copyFrom(p.curBlock)
if len(batch.bss) >= cap(batch.bss) {
var totalBlockBytes uint64
for i := range batch.bss {
totalBlockBytes += batch.bss[i].bm.uncompressedSizeBytes
}
if err := bsn.pm.AcquireResource(ctx, totalBlockBytes); err != nil {
batch.err = fmt.Errorf("cannot acquire resource: %w", err)
select {
case blockCh <- batch:
case <-ctx.Done():
releaseBlockScanResultBatch(batch)
bsn.l.Warn().Err(err).Msg("cannot acquire resource")
}
return
}
select {
case blockCh <- batch:
case <-ctx.Done():
releaseBlockScanResultBatch(batch)
bsn.l.Warn().Int("batch.len", len(batch.bss)).Msg("context canceled while sending block")
return
}
batch = generateBlockScanResultBatch()
}
}
if ti.Error() != nil {
batch.err = fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
select {
case blockCh <- batch:
case <-ctx.Done():
releaseBlockScanResultBatch(batch)
}
return
}
if len(batch.bss) > 0 {
select {
case blockCh <- batch:
case <-ctx.Done():
releaseBlockScanResultBatch(batch)
}
return
}
releaseBlockScanResultBatch(batch)
}