in component/block_cache/block_cache.go [793:903]
func (bc *BlockCache) startPrefetch(handle *handlemap.Handle, index uint64, prefetch bool) error {
// Calculate how many buffers we have in free and in-process queue
currentCnt := handle.Buffers.Cooked.Len() + handle.Buffers.Cooking.Len()
cnt := uint32(0)
if handle.OptCnt > MIN_RANDREAD {
// This handle has been read randomly and we have reached the threshold to declare a random read case
if currentCnt > MIN_PREFETCH {
// As this file is in random read mode now, release the excess buffers. Just keep 5 buffers for it to work
log.Info("BlockCache::startPrefetch : Cleanup excessive blocks %v=>%s index %v", handle.ID, handle.Path, index)
// As this is random read move all in process blocks to free list
nodeList := handle.Buffers.Cooking
currentCnt = nodeList.Len()
node := nodeList.Front()
for i := 0; node != nil && i < currentCnt; node = nodeList.Front() {
// Test whether this block is already downloaded or still under download
block := handle.Buffers.Cooking.Remove(node).(*Block)
block.node = nil
i++
//This list may contain dirty blocks which are yet to be committed.
select {
case _, ok := <-block.state:
// As we are first reader of this block here its important to unblock any future readers on this block
if ok {
block.flags.Clear(BlockFlagDownloading)
block.Unblock()
// Block is downloaded so it's safe to ready it for reuse
block.node = handle.Buffers.Cooked.PushBack(block)
} else {
block.node = handle.Buffers.Cooking.PushBack(block)
}
default:
// Block is still under download so can not reuse this
block.node = handle.Buffers.Cooking.PushBack(block)
}
}
// Now remove excess blocks from cooked list
nodeList = handle.Buffers.Cooked
currentCnt = nodeList.Len()
node = nodeList.Front()
for ; node != nil && currentCnt > MIN_PREFETCH; node = nodeList.Front() {
block := node.Value.(*Block)
_ = nodeList.Remove(node)
// Remove entry of this block from map so that no one can find it
handle.RemoveValue(fmt.Sprintf("%v", block.id))
block.node = nil
// Submit this block back to pool for reuse
block.ReUse()
bc.blockPool.Release(block)
currentCnt--
}
}
// As we were asked to download a block, for random read case download only the requested block
// This is where prefetching is blocked now as we download just the block which is requested
cnt = 1
} else {
// This handle is having sequential reads so far
// Allocate more buffers if required until we hit the prefetch count limit
for ; currentCnt < int(bc.prefetch) && cnt < MIN_PREFETCH; currentCnt++ {
block := bc.blockPool.TryGet()
if block != nil {
block.node = handle.Buffers.Cooked.PushFront(block)
cnt++
}
}
// If no new buffers were allocated then we have all buffers allocated to this handle already
// time to switch to a sliding window where we remove one block and lineup a new block for download
if cnt == 0 {
cnt = 1
}
}
for i := uint32(0); i < cnt; i++ {
// Check if the block exists in the local cache or not
// If not, download the block from storage
_, found := handle.GetValue(fmt.Sprintf("%v", index))
if !found {
// Check if the block is an uncommitted block or not
// For uncommitted block we need to commit the block first
shouldCommit, _ := shouldCommitAndDownload(int64(index), handle)
if shouldCommit {
// This shall happen only for the first uncommitted block and shall flush all the uncommitted blocks to storage
log.Debug("BlockCache::startPrefetch : Fetching an uncommitted block %v, so committing all the staged blocks for %v=>%s", index, handle.ID, handle.Path)
err := bc.commitBlocks(handle)
if err != nil {
log.Err("BlockCache::startPrefetch : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error())
return err
}
}
// push the block for download
err := bc.refreshBlock(handle, index, prefetch || i > 0)
if err != nil {
return err
}
index++
}
}
return nil
}