in component/block_cache/block_cache.go [1485:1560]
func (bc *BlockCache) upload(item *workItem) {
fileName := fmt.Sprintf("%s::%v", item.handle.Path, item.block.id)
// filename_blockindex is the key for the lock
// this ensure that at a given time a block from a file is downloaded only once across all open handles
flock := bc.fileLocks.Get(fileName)
flock.Lock()
defer flock.Unlock()
blockSize := bc.getBlockSize(uint64(item.handle.Size), item.block)
// This block is updated so we need to stage it now
err := bc.NextComponent().StageData(internal.StageDataOptions{
Name: item.handle.Path,
Data: item.block.data[0:blockSize],
Offset: uint64(item.block.offset),
Id: item.blockId})
if err != nil {
// Fail to write the data so just reschedule this request
log.Err("BlockCache::upload : Failed to write %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error())
item.failCnt++
if item.failCnt > MAX_FAIL_CNT {
// If we failed to write the data 3 times then just give up
log.Err("BlockCache::upload : 3 attempts to upload a block have failed %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset)
item.block.Failed()
item.block.Ready(BlockStatusUploadFailed)
return
}
bc.threadPool.Schedule(false, item)
return
}
if bc.tmpPath != "" {
localPath := filepath.Join(bc.tmpPath, fileName)
err := os.MkdirAll(filepath.Dir(localPath), 0777)
if err != nil {
log.Err("BlockCache::upload : error creating directory structure for file %s [%s]", localPath, err.Error())
goto return_safe
}
// Dump this block to local disk cache
f, err := os.Create(localPath)
if err == nil {
_, err := f.Write(item.block.data[0:blockSize])
if err != nil {
log.Err("BlockCache::upload : Failed to write %s to disk [%v]", localPath, err.Error())
_ = os.Remove(localPath)
goto return_safe
}
f.Close()
diskNode, found := bc.fileNodeMap.Load(fileName)
if !found {
diskNode = bc.diskPolicy.Add(fileName)
bc.fileNodeMap.Store(fileName, diskNode)
} else {
bc.diskPolicy.Refresh(diskNode.(*list.Element))
}
// If user has enabled consistency check then compute the md5sum and save it in xattr
if bc.consistency {
hash := common.GetCRC64(item.block.data, int(blockSize))
err = syscall.Setxattr(localPath, "user.md5sum", hash, 0)
if err != nil {
log.Err("BlockCache::download : Failed to set md5sum for file %s [%v]", localPath, err.Error())
}
}
}
}
return_safe:
item.block.flags.Set(BlockFlagSynced)
item.block.NoMoreDirty()
item.block.Ready(BlockStatusUploaded)
}