func()

in component/block_cache/block_cache.go [1485:1560]


func (bc *BlockCache) upload(item *workItem) {
	fileName := fmt.Sprintf("%s::%v", item.handle.Path, item.block.id)

	// filename_blockindex is the key for the lock
	// this ensure that at a given time a block from a file is downloaded only once across all open handles
	flock := bc.fileLocks.Get(fileName)
	flock.Lock()
	defer flock.Unlock()
	blockSize := bc.getBlockSize(uint64(item.handle.Size), item.block)
	// This block is updated so we need to stage it now
	err := bc.NextComponent().StageData(internal.StageDataOptions{
		Name:   item.handle.Path,
		Data:   item.block.data[0:blockSize],
		Offset: uint64(item.block.offset),
		Id:     item.blockId})
	if err != nil {
		// Fail to write the data so just reschedule this request
		log.Err("BlockCache::upload : Failed to write %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error())
		item.failCnt++

		if item.failCnt > MAX_FAIL_CNT {
			// If we failed to write the data 3 times then just give up
			log.Err("BlockCache::upload : 3 attempts to upload a block have failed %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset)
			item.block.Failed()
			item.block.Ready(BlockStatusUploadFailed)
			return
		}

		bc.threadPool.Schedule(false, item)
		return
	}

	if bc.tmpPath != "" {
		localPath := filepath.Join(bc.tmpPath, fileName)

		err := os.MkdirAll(filepath.Dir(localPath), 0777)
		if err != nil {
			log.Err("BlockCache::upload : error creating directory structure for file %s [%s]", localPath, err.Error())
			goto return_safe
		}

		// Dump this block to local disk cache
		f, err := os.Create(localPath)
		if err == nil {
			_, err := f.Write(item.block.data[0:blockSize])
			if err != nil {
				log.Err("BlockCache::upload : Failed to write %s to disk [%v]", localPath, err.Error())
				_ = os.Remove(localPath)
				goto return_safe
			}

			f.Close()
			diskNode, found := bc.fileNodeMap.Load(fileName)
			if !found {
				diskNode = bc.diskPolicy.Add(fileName)
				bc.fileNodeMap.Store(fileName, diskNode)
			} else {
				bc.diskPolicy.Refresh(diskNode.(*list.Element))
			}

			// If user has enabled consistency check then compute the md5sum and save it in xattr
			if bc.consistency {
				hash := common.GetCRC64(item.block.data, int(blockSize))
				err = syscall.Setxattr(localPath, "user.md5sum", hash, 0)
				if err != nil {
					log.Err("BlockCache::download : Failed to set md5sum for file %s [%v]", localPath, err.Error())
				}
			}
		}
	}

return_safe:
	item.block.flags.Set(BlockFlagSynced)
	item.block.NoMoreDirty()
	item.block.Ready(BlockStatusUploaded)
}