func()

in component/block_cache/block_cache.go [793:903]


func (bc *BlockCache) startPrefetch(handle *handlemap.Handle, index uint64, prefetch bool) error {
	// Calculate how many buffers we have in free and in-process queue
	currentCnt := handle.Buffers.Cooked.Len() + handle.Buffers.Cooking.Len()
	cnt := uint32(0)

	if handle.OptCnt > MIN_RANDREAD {
		// This handle has been read randomly and we have reached the threshold to declare a random read case

		if currentCnt > MIN_PREFETCH {
			// As this file is in random read mode now, release the excess buffers. Just keep 5 buffers for it to work
			log.Info("BlockCache::startPrefetch : Cleanup excessive blocks  %v=>%s index %v", handle.ID, handle.Path, index)

			// As this is random read move all in process blocks to free list
			nodeList := handle.Buffers.Cooking
			currentCnt = nodeList.Len()
			node := nodeList.Front()

			for i := 0; node != nil && i < currentCnt; node = nodeList.Front() {
				// Test whether this block is already downloaded or still under download
				block := handle.Buffers.Cooking.Remove(node).(*Block)
				block.node = nil
				i++
				//This list may contain dirty blocks which are yet to be committed.
				select {
				case _, ok := <-block.state:
					// As we are first reader of this block here its important to unblock any future readers on this block
					if ok {
						block.flags.Clear(BlockFlagDownloading)
						block.Unblock()
						// Block is downloaded so it's safe to ready it for reuse
						block.node = handle.Buffers.Cooked.PushBack(block)
					} else {
						block.node = handle.Buffers.Cooking.PushBack(block)
					}

				default:
					// Block is still under download so can not reuse this
					block.node = handle.Buffers.Cooking.PushBack(block)
				}
			}

			// Now remove excess blocks from cooked list
			nodeList = handle.Buffers.Cooked
			currentCnt = nodeList.Len()
			node = nodeList.Front()

			for ; node != nil && currentCnt > MIN_PREFETCH; node = nodeList.Front() {
				block := node.Value.(*Block)
				_ = nodeList.Remove(node)

				// Remove entry of this block from map so that no one can find it
				handle.RemoveValue(fmt.Sprintf("%v", block.id))
				block.node = nil

				// Submit this block back to pool for reuse
				block.ReUse()
				bc.blockPool.Release(block)

				currentCnt--
			}
		}
		// As we were asked to download a block, for random read case download only the requested block
		// This is where prefetching is blocked now as we download just the block which is requested
		cnt = 1
	} else {
		// This handle is having sequential reads so far
		// Allocate more buffers if required until we hit the prefetch count limit
		for ; currentCnt < int(bc.prefetch) && cnt < MIN_PREFETCH; currentCnt++ {
			block := bc.blockPool.TryGet()
			if block != nil {
				block.node = handle.Buffers.Cooked.PushFront(block)
				cnt++
			}
		}

		// If no new buffers were allocated then we have all buffers allocated to this handle already
		// time to switch to a sliding window where we remove one block and lineup a new block for download
		if cnt == 0 {
			cnt = 1
		}
	}

	for i := uint32(0); i < cnt; i++ {
		// Check if the block exists in the local cache or not
		// If not, download the block from storage
		_, found := handle.GetValue(fmt.Sprintf("%v", index))
		if !found {
			// Check if the block is an uncommitted block or not
			// For uncommitted block we need to commit the block first
			shouldCommit, _ := shouldCommitAndDownload(int64(index), handle)
			if shouldCommit {
				// This shall happen only for the first uncommitted block and shall flush all the uncommitted blocks to storage
				log.Debug("BlockCache::startPrefetch : Fetching an uncommitted block %v, so committing all the staged blocks for %v=>%s", index, handle.ID, handle.Path)
				err := bc.commitBlocks(handle)
				if err != nil {
					log.Err("BlockCache::startPrefetch : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error())
					return err
				}
			}

			// push the block for download
			err := bc.refreshBlock(handle, index, prefetch || i > 0)
			if err != nil {
				return err
			}
			index++
		}
	}

	return nil
}