in component/block_cache/block_cache.go [990:1129]
func (bc *BlockCache) download(item *workItem) {
fileName := fmt.Sprintf("%s::%v", item.handle.Path, item.block.id)
// filename_blockindex is the key for the lock
// this ensure that at a given time a block from a file is downloaded only once across all open handles
flock := bc.fileLocks.Get(fileName)
flock.Lock()
defer flock.Unlock()
var diskNode any
found := false
localPath := ""
if bc.tmpPath != "" {
// Update diskpolicy to reflect the new file
diskNode, found = bc.fileNodeMap.Load(fileName)
if !found {
diskNode = bc.diskPolicy.Add(fileName)
bc.fileNodeMap.Store(fileName, diskNode)
} else {
bc.diskPolicy.Refresh(diskNode.(*list.Element))
}
// Check local file exists for this offset and file combination or not
localPath = filepath.Join(bc.tmpPath, fileName)
_, err := os.Stat(localPath)
if err == nil {
// If file exists then read the block from the local file
f, err := os.Open(localPath)
if err != nil {
// On any disk failure we do not fail the download flow
log.Err("BlockCache::download : Failed to open file %s [%s]", fileName, err.Error())
_ = os.Remove(localPath)
} else {
var successfulRead bool = true
numberOfBytes, err := f.Read(item.block.data)
if err != nil {
log.Err("BlockCache::download : Failed to read data from disk cache %s [%s]", fileName, err.Error())
successfulRead = false
_ = os.Remove(localPath)
}
if numberOfBytes != int(bc.blockSize) && item.block.offset+uint64(numberOfBytes) != uint64(item.handle.Size) {
log.Err("BlockCache::download : Local data retrieved from disk size mismatch, Expected %v, OnDisk %v, fileSize %v", bc.getBlockSize(uint64(item.handle.Size), item.block), numberOfBytes, item.handle.Size)
successfulRead = false
_ = os.Remove(localPath)
}
f.Close()
if successfulRead {
// If user has enabled consistency check then compute the md5sum and match it in xattr
successfulRead = checkBlockConsistency(bc, item, numberOfBytes, localPath, fileName)
// We have read the data from disk so there is no need to go over network
// Just mark the block that download is complete
if successfulRead {
item.block.Ready(BlockStatusDownloaded)
return
}
}
}
}
}
var etag string
// If file does not exists then download the block from the container
n, err := bc.NextComponent().ReadInBuffer(internal.ReadInBufferOptions{
Handle: item.handle,
Offset: int64(item.block.offset),
Data: item.block.data,
Etag: &etag,
})
if item.failCnt > MAX_FAIL_CNT {
// If we failed to read the data 3 times then just give up
log.Err("BlockCache::download : 3 attempts to download a block have failed %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset)
item.block.Failed()
item.block.Ready(BlockStatusDownloadFailed)
return
}
if err != nil && err != io.EOF {
// Fail to read the data so just reschedule this request
log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error())
item.failCnt++
bc.threadPool.Schedule(false, item)
return
} else if n == 0 {
// No data read so just reschedule this request
log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [0 bytes read]", item.handle.ID, item.handle.Path, item.block.id)
item.failCnt++
bc.threadPool.Schedule(false, item)
return
}
// Compare the ETAG value and fail download if blob has changed
if etag != "" {
if item.ETag != "" && item.ETag != etag {
log.Err("BlockCache::download : Blob has changed for %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset)
item.block.Failed()
item.block.Ready(BlockStatusDownloadFailed)
return
}
}
if bc.tmpPath != "" {
err := os.MkdirAll(filepath.Dir(localPath), 0777)
if err != nil {
log.Err("BlockCache::download : error creating directory structure for file %s [%s]", localPath, err.Error())
return
}
// Dump this block to local disk cache
f, err := os.Create(localPath)
if err == nil {
_, err := f.Write(item.block.data[:n])
if err != nil {
log.Err("BlockCache::download : Failed to write %s to disk [%v]", localPath, err.Error())
_ = os.Remove(localPath)
}
f.Close()
bc.diskPolicy.Refresh(diskNode.(*list.Element))
// If user has enabled consistency check then compute the md5sum and save it in xattr
if bc.consistency {
hash := common.GetCRC64(item.block.data, n)
err = syscall.Setxattr(localPath, "user.md5sum", hash, 0)
if err != nil {
log.Err("BlockCache::download : Failed to set md5sum for file %s [%v]", localPath, err.Error())
}
}
}
}
// Just mark the block that download is complete
item.block.Ready(BlockStatusDownloaded)
}