in component/block_cache/block_cache.go [1196:1321]
func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) (*Block, error) {
// Check the given block index is already available or not
index := bc.getBlockIndex(offset)
if index >= MAX_BLOCKS {
log.Err("BlockCache::getOrCreateBlock : Failed to get Block %v=>%s offset %v", handle.ID, handle.Path, offset)
return nil, fmt.Errorf("block index out of range. Increase your block size")
}
// log.Debug("FilBlockCacheCache::getOrCreateBlock : Get block for %s, index %v", handle.Path, index)
var block *Block
var err error
node, found := handle.GetValue(fmt.Sprintf("%v", index))
if !found {
// If too many buffers are piled up for this file then try to evict some of those which are already uploaded
if handle.Buffers.Cooked.Len()+handle.Buffers.Cooking.Len() >= int(bc.prefetch) {
bc.waitAndFreeUploadedBlocks(handle, 1)
}
// Either the block is not fetched yet or offset goes beyond the file size
block = bc.blockPool.MustGet()
if block == nil {
log.Err("BlockCache::getOrCreateBlock : Unable to allocate block %v=>%s (index %v)", handle.ID, handle.Path, index)
return nil, fmt.Errorf("unable to allocate block")
}
block.node = nil
block.id = int64(index)
block.offset = index * bc.blockSize
if block.offset < uint64(handle.Size) {
shouldCommit, shouldDownload := shouldCommitAndDownload(block.id, handle)
// if a block has been staged and deleted from the buffer list, then we should commit the existing blocks
// commit the dirty blocks and download the given block
if shouldCommit {
log.Debug("BlockCache::getOrCreateBlock : Fetching an uncommitted block %v, so committing all the staged blocks for %v=>%s", block.id, handle.ID, handle.Path)
err = bc.commitBlocks(handle)
if err != nil {
log.Err("BlockCache::getOrCreateBlock : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error())
return nil, err
}
}
// download the block if,
// - it was already committed, or
// - it was committed by the above commit blocks operation
if shouldDownload || shouldCommit {
// We are writing somewhere in between so just fetch this block
log.Debug("BlockCache::getOrCreateBlock : Downloading block %v for %v=>%v", block.id, handle.ID, handle.Path)
bc.lineupDownload(handle, block, false)
// Now wait for download to complete
<-block.state
// if the block failed to download, it can't be used for overwriting
if block.IsFailed() {
log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path)
// Remove this node from handle so that next read retries to download the block again
bc.releaseDownloadFailedBlock(handle, block)
return nil, fmt.Errorf("failed to download block")
}
} else {
log.Debug("BlockCache::getOrCreateBlock : push block %v to the cooking list for %v=>%v", block.id, handle.ID, handle.Path)
block.node = handle.Buffers.Cooking.PushBack(block)
}
} else {
block.node = handle.Buffers.Cooking.PushBack(block)
}
handle.SetValue(fmt.Sprintf("%v", index), block)
block.flags.Clear(BlockFlagDownloading)
block.Unblock()
// As we are creating new blocks here, we need to push the block for upload and remove them from list here
if handle.Buffers.Cooking.Len() > MIN_WRITE_BLOCK {
err = bc.stageBlocks(handle, 1)
if err != nil {
log.Err("BlockCache::getOrCreateBlock : Unable to stage blocks for %s [%s]", handle.Path, err.Error())
}
}
} else {
// We have the block now which we wish to write
block = node.(*Block)
// If the block was staged earlier then we are overwriting it here so move it back to cooking queue
if block.flags.IsSet(BlockFlagSynced) {
log.Debug("BlockCache::getOrCreateBlock : Overwriting back to staged block %v for %v=>%s", block.id, handle.ID, handle.Path)
} else if block.flags.IsSet(BlockFlagDownloading) {
log.Debug("BlockCache::getOrCreateBlock : Waiting for download to finish for committed block %v for %v=>%s", block.id, handle.ID, handle.Path)
_, ok := <-block.state
if ok {
block.Unblock()
}
// if the block failed to download, it can't be used for overwriting
if block.IsFailed() {
log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path)
// Remove this node from handle so that next read retries to download the block again
bc.releaseDownloadFailedBlock(handle, block)
return nil, fmt.Errorf("failed to download block")
}
} else if block.flags.IsSet(BlockFlagUploading) {
// If the block is being staged, then wait till it is uploaded,
// and then write to the same block and move it back to cooking queue
log.Debug("BlockCache::getOrCreateBlock : Waiting for the block %v to upload for %v=>%s", block.id, handle.ID, handle.Path)
_, ok := <-block.state
if ok {
block.Unblock()
}
}
bc.addToCooking(handle, block)
block.flags.Clear(BlockFlagUploading)
block.flags.Clear(BlockFlagDownloading)
block.flags.Clear(BlockFlagSynced)
}
return block, nil
}