func()

in component/block_cache/block_cache.go [990:1129]


func (bc *BlockCache) download(item *workItem) {
	fileName := fmt.Sprintf("%s::%v", item.handle.Path, item.block.id)

	// filename_blockindex is the key for the lock
	// this ensure that at a given time a block from a file is downloaded only once across all open handles
	flock := bc.fileLocks.Get(fileName)
	flock.Lock()
	defer flock.Unlock()

	var diskNode any
	found := false
	localPath := ""

	if bc.tmpPath != "" {
		// Update diskpolicy to reflect the new file
		diskNode, found = bc.fileNodeMap.Load(fileName)
		if !found {
			diskNode = bc.diskPolicy.Add(fileName)
			bc.fileNodeMap.Store(fileName, diskNode)
		} else {
			bc.diskPolicy.Refresh(diskNode.(*list.Element))
		}

		// Check local file exists for this offset and file combination or not
		localPath = filepath.Join(bc.tmpPath, fileName)
		_, err := os.Stat(localPath)

		if err == nil {
			// If file exists then read the block from the local file
			f, err := os.Open(localPath)
			if err != nil {
				// On any disk failure we do not fail the download flow
				log.Err("BlockCache::download : Failed to open file %s [%s]", fileName, err.Error())
				_ = os.Remove(localPath)
			} else {
				var successfulRead bool = true
				numberOfBytes, err := f.Read(item.block.data)
				if err != nil {
					log.Err("BlockCache::download : Failed to read data from disk cache %s [%s]", fileName, err.Error())
					successfulRead = false
					_ = os.Remove(localPath)
				}

				if numberOfBytes != int(bc.blockSize) && item.block.offset+uint64(numberOfBytes) != uint64(item.handle.Size) {
					log.Err("BlockCache::download : Local data retrieved from disk size mismatch, Expected %v, OnDisk %v, fileSize %v", bc.getBlockSize(uint64(item.handle.Size), item.block), numberOfBytes, item.handle.Size)
					successfulRead = false
					_ = os.Remove(localPath)
				}

				f.Close()

				if successfulRead {
					// If user has enabled consistency check then compute the md5sum and match it in xattr
					successfulRead = checkBlockConsistency(bc, item, numberOfBytes, localPath, fileName)

					// We have read the data from disk so there is no need to go over network
					// Just mark the block that download is complete
					if successfulRead {
						item.block.Ready(BlockStatusDownloaded)
						return
					}
				}
			}
		}
	}

	var etag string
	// If file does not exists then download the block from the container
	n, err := bc.NextComponent().ReadInBuffer(internal.ReadInBufferOptions{
		Handle: item.handle,
		Offset: int64(item.block.offset),
		Data:   item.block.data,
		Etag:   &etag,
	})

	if item.failCnt > MAX_FAIL_CNT {
		// If we failed to read the data 3 times then just give up
		log.Err("BlockCache::download : 3 attempts to download a block have failed %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset)
		item.block.Failed()
		item.block.Ready(BlockStatusDownloadFailed)
		return
	}

	if err != nil && err != io.EOF {
		// Fail to read the data so just reschedule this request
		log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error())
		item.failCnt++
		bc.threadPool.Schedule(false, item)
		return
	} else if n == 0 {
		// No data read so just reschedule this request
		log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [0 bytes read]", item.handle.ID, item.handle.Path, item.block.id)
		item.failCnt++
		bc.threadPool.Schedule(false, item)
		return
	}

	// Compare the ETAG value and fail download if blob has changed
	if etag != "" {
		if item.ETag != "" && item.ETag != etag {
			log.Err("BlockCache::download : Blob has changed for %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset)
			item.block.Failed()
			item.block.Ready(BlockStatusDownloadFailed)
			return
		}
	}

	if bc.tmpPath != "" {
		err := os.MkdirAll(filepath.Dir(localPath), 0777)
		if err != nil {
			log.Err("BlockCache::download : error creating directory structure for file %s [%s]", localPath, err.Error())
			return
		}

		// Dump this block to local disk cache
		f, err := os.Create(localPath)
		if err == nil {
			_, err := f.Write(item.block.data[:n])
			if err != nil {
				log.Err("BlockCache::download : Failed to write %s to disk [%v]", localPath, err.Error())
				_ = os.Remove(localPath)
			}

			f.Close()
			bc.diskPolicy.Refresh(diskNode.(*list.Element))

			// If user has enabled consistency check then compute the md5sum and save it in xattr
			if bc.consistency {
				hash := common.GetCRC64(item.block.data, n)
				err = syscall.Setxattr(localPath, "user.md5sum", hash, 0)
				if err != nil {
					log.Err("BlockCache::download : Failed to set md5sum for file %s [%v]", localPath, err.Error())
				}
			}
		}
	}

	// Just mark the block that download is complete
	item.block.Ready(BlockStatusDownloaded)
}