in component/block_cache/block_cache.go [660:785]
func (bc *BlockCache) getBlock(handle *handlemap.Handle, readoffset uint64) (*Block, error) {
if readoffset >= uint64(handle.Size) {
return nil, io.EOF
}
// Check the given block index is already available or not
index := bc.getBlockIndex(readoffset)
node, found := handle.GetValue(fmt.Sprintf("%v", index))
if !found {
// block is not present in the buffer list, check if it is uncommitted
// If yes, commit all the uncommitted blocks first and then download this block
shouldCommit, shouldDownload := shouldCommitAndDownload(int64(index), handle)
if shouldCommit {
// commit all the uncommitted blocks to storage
log.Debug("BlockCache::getBlock : Downloading an uncommitted block %v, so committing all the staged blocks for %v=>%s", index, handle.ID, handle.Path)
err := bc.commitBlocks(handle)
if err != nil {
log.Err("BlockCache::getBlock : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error())
return nil, err
}
} else if !shouldCommit && !shouldDownload {
prop, err := bc.GetAttr(internal.GetAttrOptions{Name: handle.Path, RetrieveMetadata: false})
//if failed to get attr
if err != nil {
log.Err("BlockCache::getBlock : Failed to get properties for %v=>%s [%s]", handle.ID, handle.Path, err.Error())
return nil, err
}
if readoffset >= uint64(prop.Size) {
//create a null block and return
block := bc.blockPool.MustGet()
block.offset = readoffset
// block.flags.Set(BlockFlagSynced)
log.Debug("BlockCache::getBlock : Returning a null block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
return block, nil
}
}
// If this is the first read request then prefetch all required nodes
val, _ := handle.GetValue("#")
if !bc.noPrefetch && val.(uint64) == 0 {
log.Debug("BlockCache::getBlock : Starting the prefetch %v=>%s (offset %v, index %v)", handle.ID, handle.Path, readoffset, index)
// This is the first read for this file handle so start prefetching all the nodes
err := bc.startPrefetch(handle, index, false)
if err != nil && err != io.EOF {
log.Err("BlockCache::getBlock : Unable to start prefetch %v=>%s (offset %v, index %v) [%s]", handle.ID, handle.Path, readoffset, index, err.Error())
return nil, err
}
} else {
// This is a case of random read so increment the random read count
handle.OptCnt++
log.Debug("BlockCache::getBlock : Unable to get block %v=>%s (offset %v, index %v) Random %v", handle.ID, handle.Path, readoffset, index, handle.OptCnt)
// This block is not present even after prefetch so lets download it now
err := bc.startPrefetch(handle, index, false)
if err != nil && err != io.EOF {
log.Err("BlockCache::getBlock : Unable to start prefetch %v=>%s (offset %v, index %v) [%s]", handle.ID, handle.Path, readoffset, index, err.Error())
return nil, err
}
}
// This node was not found so above logic should have queued it up, retry searching now
node, found = handle.GetValue(fmt.Sprintf("%v", index))
if !found {
log.Err("BlockCache::getBlock : Failed to get the required block %v=>%s (offset %v, index %v)", handle.ID, handle.Path, readoffset, index)
return nil, fmt.Errorf("not able to find block immediately after scheudling")
}
}
// We have the block now which we wish to read
block := node.(*Block)
// Wait for this block to complete the download
t, ok := <-block.state
if ok {
// this block is now open to read and process
block.Unblock()
switch t {
case BlockStatusDownloaded:
log.Debug("BlockCache::getBlock : Downloaded block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
block.flags.Clear(BlockFlagDownloading)
// Download complete and you are first reader of this block
if !bc.noPrefetch && handle.OptCnt <= MIN_RANDREAD {
// So far this file has been read sequentially so prefetch more
val, _ := handle.GetValue("#")
if int64(val.(uint64)*bc.blockSize) < handle.Size {
_ = bc.startPrefetch(handle, val.(uint64), true)
}
}
// This block was moved to in-process queue as download is complete lets move it back to normal queue
bc.addToCooked(handle, block)
// mark this block as synced so that if it can used for write later
// which will move it back to cooking list as per the synced flag
block.flags.Set(BlockFlagSynced)
case BlockStatusUploaded:
log.Debug("BlockCache::getBlock : Staged block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
block.flags.Clear(BlockFlagUploading)
case BlockStatusDownloadFailed:
log.Err("BlockCache::getBlock : Failed to download block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
// Remove this node from handle so that next read retries to download the block again
bc.releaseDownloadFailedBlock(handle, block)
return nil, fmt.Errorf("failed to download block")
case BlockStatusUploadFailed:
// Local data is still valid so continue using this buffer
log.Err("BlockCache::getBlock : Failed to upload block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
block.flags.Clear(BlockFlagUploading)
// Move this block to end of queue as this is still modified and un-staged
bc.addToCooking(handle, block)
}
}
return block, nil
}