func()

in component/block_cache/block_cache.go [660:785]


func (bc *BlockCache) getBlock(handle *handlemap.Handle, readoffset uint64) (*Block, error) {
	if readoffset >= uint64(handle.Size) {
		return nil, io.EOF
	}

	// Check the given block index is already available or not
	index := bc.getBlockIndex(readoffset)
	node, found := handle.GetValue(fmt.Sprintf("%v", index))
	if !found {

		// block is not present in the buffer list, check if it is uncommitted
		// If yes, commit all the uncommitted blocks first and then download this block
		shouldCommit, shouldDownload := shouldCommitAndDownload(int64(index), handle)
		if shouldCommit {
			// commit all the uncommitted blocks to storage
			log.Debug("BlockCache::getBlock : Downloading an uncommitted block %v, so committing all the staged blocks for %v=>%s", index, handle.ID, handle.Path)
			err := bc.commitBlocks(handle)
			if err != nil {
				log.Err("BlockCache::getBlock : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error())
				return nil, err
			}
		} else if !shouldCommit && !shouldDownload {
			prop, err := bc.GetAttr(internal.GetAttrOptions{Name: handle.Path, RetrieveMetadata: false})
			//if failed to get attr
			if err != nil {
				log.Err("BlockCache::getBlock : Failed to get properties for %v=>%s [%s]", handle.ID, handle.Path, err.Error())
				return nil, err
			}

			if readoffset >= uint64(prop.Size) {
				//create a null block and return
				block := bc.blockPool.MustGet()
				block.offset = readoffset
				// block.flags.Set(BlockFlagSynced)
				log.Debug("BlockCache::getBlock : Returning a null block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
				return block, nil
			}
		}

		// If this is the first read request then prefetch all required nodes
		val, _ := handle.GetValue("#")
		if !bc.noPrefetch && val.(uint64) == 0 {
			log.Debug("BlockCache::getBlock : Starting the prefetch %v=>%s (offset %v, index %v)", handle.ID, handle.Path, readoffset, index)

			// This is the first read for this file handle so start prefetching all the nodes
			err := bc.startPrefetch(handle, index, false)
			if err != nil && err != io.EOF {
				log.Err("BlockCache::getBlock : Unable to start prefetch  %v=>%s (offset %v, index %v) [%s]", handle.ID, handle.Path, readoffset, index, err.Error())
				return nil, err
			}
		} else {
			// This is a case of random read so increment the random read count
			handle.OptCnt++

			log.Debug("BlockCache::getBlock : Unable to get block %v=>%s (offset %v, index %v) Random %v", handle.ID, handle.Path, readoffset, index, handle.OptCnt)

			// This block is not present even after prefetch so lets download it now
			err := bc.startPrefetch(handle, index, false)
			if err != nil && err != io.EOF {
				log.Err("BlockCache::getBlock : Unable to start prefetch  %v=>%s (offset %v, index %v) [%s]", handle.ID, handle.Path, readoffset, index, err.Error())
				return nil, err
			}
		}

		// This node was not found so above logic should have queued it up, retry searching now
		node, found = handle.GetValue(fmt.Sprintf("%v", index))
		if !found {
			log.Err("BlockCache::getBlock : Failed to get the required block %v=>%s (offset %v, index %v)", handle.ID, handle.Path, readoffset, index)
			return nil, fmt.Errorf("not able to find block immediately after scheudling")
		}
	}

	// We have the block now which we wish to read
	block := node.(*Block)

	// Wait for this block to complete the download
	t, ok := <-block.state
	if ok {
		// this block is now open to read and process
		block.Unblock()

		switch t {
		case BlockStatusDownloaded:
			log.Debug("BlockCache::getBlock : Downloaded block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)

			block.flags.Clear(BlockFlagDownloading)

			// Download complete and you are first reader of this block
			if !bc.noPrefetch && handle.OptCnt <= MIN_RANDREAD {
				// So far this file has been read sequentially so prefetch more
				val, _ := handle.GetValue("#")
				if int64(val.(uint64)*bc.blockSize) < handle.Size {
					_ = bc.startPrefetch(handle, val.(uint64), true)
				}
			}

			// This block was moved to in-process queue as download is complete lets move it back to normal queue
			bc.addToCooked(handle, block)

			// mark this block as synced so that if it can used for write later
			// which will move it back to cooking list as per the synced flag
			block.flags.Set(BlockFlagSynced)

		case BlockStatusUploaded:
			log.Debug("BlockCache::getBlock : Staged block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
			block.flags.Clear(BlockFlagUploading)

		case BlockStatusDownloadFailed:
			log.Err("BlockCache::getBlock : Failed to download block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)

			// Remove this node from handle so that next read retries to download the block again
			bc.releaseDownloadFailedBlock(handle, block)
			return nil, fmt.Errorf("failed to download block")

		case BlockStatusUploadFailed:
			// Local data is still valid so continue using this buffer
			log.Err("BlockCache::getBlock : Failed to upload block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
			block.flags.Clear(BlockFlagUploading)

			// Move this block to end of queue as this is still modified and un-staged
			bc.addToCooking(handle, block)
		}
	}

	return block, nil
}