func()

in component/block_cache/block_cache.go [1196:1321]


func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) (*Block, error) {
	// Check the given block index is already available or not
	index := bc.getBlockIndex(offset)
	if index >= MAX_BLOCKS {
		log.Err("BlockCache::getOrCreateBlock : Failed to get Block %v=>%s offset %v", handle.ID, handle.Path, offset)
		return nil, fmt.Errorf("block index out of range. Increase your block size")
	}

	// log.Debug("FilBlockCacheCache::getOrCreateBlock : Get block for %s, index %v", handle.Path, index)

	var block *Block
	var err error

	node, found := handle.GetValue(fmt.Sprintf("%v", index))
	if !found {
		// If too many buffers are piled up for this file then try to evict some of those which are already uploaded
		if handle.Buffers.Cooked.Len()+handle.Buffers.Cooking.Len() >= int(bc.prefetch) {
			bc.waitAndFreeUploadedBlocks(handle, 1)
		}

		// Either the block is not fetched yet or offset goes beyond the file size
		block = bc.blockPool.MustGet()
		if block == nil {
			log.Err("BlockCache::getOrCreateBlock : Unable to allocate block %v=>%s (index %v)", handle.ID, handle.Path, index)
			return nil, fmt.Errorf("unable to allocate block")
		}

		block.node = nil
		block.id = int64(index)
		block.offset = index * bc.blockSize

		if block.offset < uint64(handle.Size) {
			shouldCommit, shouldDownload := shouldCommitAndDownload(block.id, handle)

			// if a block has been staged and deleted from the buffer list, then we should commit the existing blocks
			// commit the dirty blocks and download the given block
			if shouldCommit {
				log.Debug("BlockCache::getOrCreateBlock : Fetching an uncommitted block %v, so committing all the staged blocks for %v=>%s", block.id, handle.ID, handle.Path)
				err = bc.commitBlocks(handle)
				if err != nil {
					log.Err("BlockCache::getOrCreateBlock : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error())
					return nil, err
				}
			}

			// download the block if,
			//    - it was already committed, or
			//    - it was committed by the above commit blocks operation
			if shouldDownload || shouldCommit {
				// We are writing somewhere in between so just fetch this block
				log.Debug("BlockCache::getOrCreateBlock : Downloading block %v for %v=>%v", block.id, handle.ID, handle.Path)
				bc.lineupDownload(handle, block, false)

				// Now wait for download to complete
				<-block.state

				// if the block failed to download, it can't be used for overwriting
				if block.IsFailed() {
					log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path)

					// Remove this node from handle so that next read retries to download the block again
					bc.releaseDownloadFailedBlock(handle, block)
					return nil, fmt.Errorf("failed to download block")
				}
			} else {
				log.Debug("BlockCache::getOrCreateBlock : push block %v to the cooking list for %v=>%v", block.id, handle.ID, handle.Path)
				block.node = handle.Buffers.Cooking.PushBack(block)
			}
		} else {
			block.node = handle.Buffers.Cooking.PushBack(block)
		}

		handle.SetValue(fmt.Sprintf("%v", index), block)
		block.flags.Clear(BlockFlagDownloading)
		block.Unblock()

		// As we are creating new blocks here, we need to push the block for upload and remove them from list here
		if handle.Buffers.Cooking.Len() > MIN_WRITE_BLOCK {
			err = bc.stageBlocks(handle, 1)
			if err != nil {
				log.Err("BlockCache::getOrCreateBlock : Unable to stage blocks for %s [%s]", handle.Path, err.Error())
			}
		}

	} else {
		// We have the block now which we wish to write
		block = node.(*Block)

		// If the block was staged earlier then we are overwriting it here so move it back to cooking queue
		if block.flags.IsSet(BlockFlagSynced) {
			log.Debug("BlockCache::getOrCreateBlock : Overwriting back to staged block %v for %v=>%s", block.id, handle.ID, handle.Path)

		} else if block.flags.IsSet(BlockFlagDownloading) {
			log.Debug("BlockCache::getOrCreateBlock : Waiting for download to finish for committed block %v for %v=>%s", block.id, handle.ID, handle.Path)
			_, ok := <-block.state
			if ok {
				block.Unblock()
			}

			// if the block failed to download, it can't be used for overwriting
			if block.IsFailed() {
				log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path)

				// Remove this node from handle so that next read retries to download the block again
				bc.releaseDownloadFailedBlock(handle, block)
				return nil, fmt.Errorf("failed to download block")
			}
		} else if block.flags.IsSet(BlockFlagUploading) {
			// If the block is being staged, then wait till it is uploaded,
			// and then write to the same block and move it back to cooking queue
			log.Debug("BlockCache::getOrCreateBlock : Waiting for the block %v to upload for %v=>%s", block.id, handle.ID, handle.Path)
			_, ok := <-block.state
			if ok {
				block.Unblock()
			}
		}

		bc.addToCooking(handle, block)

		block.flags.Clear(BlockFlagUploading)
		block.flags.Clear(BlockFlagDownloading)
		block.flags.Clear(BlockFlagSynced)
	}

	return block, nil
}