component/block_cache/block_cache.go (1,387 lines of code) (raw):

/* _____ _____ _____ ____ ______ _____ ------ | | | | | | | | | | | | | | | | | | | | | | | | | | | --- | | | | |-----| |---- | | |-----| |----- ------ | | | | | | | | | | | | | | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ Licensed under the MIT License <http://opensource.org/licenses/MIT>. Copyright © 2020-2025 Microsoft Corporation. All rights reserved. Author : <blobfusedev@microsoft.com> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE */ package block_cache import ( "bytes" "container/list" "context" "fmt" "io" "math" "os" "path/filepath" "runtime" "sort" "strings" "sync" "syscall" "time" "github.com/Azure/azure-storage-fuse/v2/common" "github.com/Azure/azure-storage-fuse/v2/common/config" "github.com/Azure/azure-storage-fuse/v2/common/log" "github.com/Azure/azure-storage-fuse/v2/internal" "github.com/Azure/azure-storage-fuse/v2/internal/handlemap" "github.com/vibhansa-msft/tlru" ) /* NOTES: - Component shall have a structure which inherits "internal.BaseComponent" to participate in pipeline - Component shall register a name and its constructor to participate in pipeline (add by default by generator) - Order of calls : Constructor -> Configure -> Start ..... -> Stop - To read any new setting from config file follow the Configure method default comments */ // Common structure for Component type BlockCache struct { internal.BaseComponent blockSize uint64 // Size of each block to be cached memSize uint64 // Mem size to be used for caching at the startup mntPath string // Mount path tmpPath string // Disk path where these blocks will be cached diskSize uint64 // Size of disk space allocated for the caching diskTimeout uint32 // Timeout for which disk blocks will be cached workers uint32 // Number of threads working to fetch the blocks prefetch uint32 // Number of blocks to be prefetched diskPolicy *tlru.TLRU // Disk cache eviction policy blockPool *BlockPool // Pool of blocks threadPool *ThreadPool // Pool of threads fileLocks *common.LockMap // Locks for each file_blockid to avoid multiple threads to fetch same block fileNodeMap sync.Map // Map holding files that are there in our cache maxDiskUsageHit bool // Flag to indicate if we have hit max disk usage noPrefetch bool // Flag to indicate if prefetch is disabled prefetchOnOpen bool // Start prefetching on file open call instead of waiting for first read consistency bool // Flag to indicate if strong data consistency is enabled stream *Stream lazyWrite bool // Flag to indicate if lazy write is enabled fileCloseOpt sync.WaitGroup // Wait group to wait for all async close operations to complete cleanupOnStart bool // Clear temp directory on startup } // Structure defining your config parameters type BlockCacheOptions struct { BlockSize float64 `config:"block-size-mb" yaml:"block-size-mb,omitempty"` MemSize uint64 `config:"mem-size-mb" yaml:"mem-size-mb,omitempty"` TmpPath string `config:"path" yaml:"path,omitempty"` DiskSize uint64 `config:"disk-size-mb" yaml:"disk-size-mb,omitempty"` DiskTimeout uint32 `config:"disk-timeout-sec" yaml:"timeout-sec,omitempty"` PrefetchCount uint32 `config:"prefetch" yaml:"prefetch,omitempty"` Workers uint32 `config:"parallelism" yaml:"parallelism,omitempty"` PrefetchOnOpen bool `config:"prefetch-on-open" yaml:"prefetch-on-open,omitempty"` Consistency bool `config:"consistency" yaml:"consistency,omitempty"` CleanupOnStart bool `config:"cleanup-on-start" yaml:"cleanup-on-start,omitempty"` } const ( compName = "block_cache" defaultTimeout = 120 defaultBlockSize = 16 MAX_POOL_USAGE uint32 = 80 MIN_POOL_USAGE uint32 = 50 MIN_PREFETCH = 5 MIN_WRITE_BLOCK = 3 MIN_RANDREAD = 10 MAX_FAIL_CNT = 3 MAX_BLOCKS = 50000 ) // Verification to check satisfaction criteria with Component Interface var _ internal.Component = &BlockCache{} func (bc *BlockCache) Name() string { return compName } func (bc *BlockCache) SetName(name string) { bc.BaseComponent.SetName(name) } func (bc *BlockCache) SetNextComponent(nc internal.Component) { bc.BaseComponent.SetNextComponent(nc) } // Start : Pipeline calls this method to start the component functionality // // this shall not Block the call otherwise pipeline will not start func (bc *BlockCache) Start(ctx context.Context) error { log.Trace("BlockCache::Start : Starting component %s", bc.Name()) bc.blockPool = NewBlockPool(bc.blockSize, bc.memSize) if bc.blockPool == nil { log.Err("BlockCache::Start : failed to init block pool") return fmt.Errorf("config error in %s [failed to init block pool]", bc.Name()) } bc.threadPool = newThreadPool(bc.workers, bc.download, bc.upload) if bc.threadPool == nil { log.Err("BlockCache::Start : failed to init thread pool") return fmt.Errorf("config error in %s [failed to init thread pool]", bc.Name()) } // Start the thread pool and keep it ready for download log.Debug("BlockCache::Start : Starting thread pool") bc.threadPool.Start() // If disk caching is enabled then start the disk eviction policy if bc.tmpPath != "" { err := bc.diskPolicy.Start() if err != nil { log.Err("BlockCache::Start : failed to start diskpolicy [%s]", err.Error()) return fmt.Errorf("failed to start disk-policy for block-cache") } } return nil } // Stop : Stop the component functionality and kill all threads started func (bc *BlockCache) Stop() error { log.Trace("BlockCache::Stop : Stopping component %s", bc.Name()) if bc.lazyWrite { // Wait for all async upload to complete if any log.Info("BlockCache::Stop : Waiting for async close to complete") bc.fileCloseOpt.Wait() } // Wait for thread pool to stop bc.threadPool.Stop() // Clear the disk cache on exit if bc.tmpPath != "" { _ = bc.diskPolicy.Stop() _ = common.TempCacheCleanup(bc.tmpPath) } return nil } // GenConfig : Generate the default config for the component func (bc *BlockCache) GenConfig() string { log.Info("BlockCache::Configure : config generation started") prefetch := uint32(math.Max((MIN_PREFETCH*2)+1, (float64)(2*runtime.NumCPU()))) memSize := uint32(bc.getDefaultMemSize() / _1MB) if (prefetch * defaultBlockSize) > memSize { prefetch = (MIN_PREFETCH * 2) + 1 } var sb strings.Builder sb.WriteString(fmt.Sprintf("\n%s:", bc.Name())) sb.WriteString(fmt.Sprintf("\n block-size-mb: %v", defaultBlockSize)) sb.WriteString(fmt.Sprintf("\n mem-size-mb: %v", memSize)) sb.WriteString(fmt.Sprintf("\n prefetch: %v", prefetch)) sb.WriteString(fmt.Sprintf("\n parallelism: %v", uint32(3*runtime.NumCPU()))) var tmpPath string = "" _ = config.UnmarshalKey("tmp-path", &tmpPath) if tmpPath != "" { sb.WriteString(fmt.Sprintf("\n path: %v", tmpPath)) sb.WriteString(fmt.Sprintf("\n disk-size-mb: %v", bc.getDefaultDiskSize(tmpPath))) sb.WriteString(fmt.Sprintf("\n disk-timeout-sec: %v", defaultTimeout)) } return sb.String() } // Configure : Pipeline will call this method after constructor so that you can read config and initialize yourself // // Return failure if any config is not valid to exit the process func (bc *BlockCache) Configure(_ bool) error { log.Trace("BlockCache::Configure : %s", bc.Name()) if common.IsStream { err := bc.stream.Configure(true) if err != nil { log.Err("BlockCache:Stream::Configure : config error [invalid config attributes]") return fmt.Errorf("config error in %s [%s]", bc.Name(), err.Error()) } } conf := BlockCacheOptions{} err := config.UnmarshalKey(bc.Name(), &conf) if err != nil { log.Err("BlockCache::Configure : config error [invalid config attributes]") return fmt.Errorf("config error in %s [%s]", bc.Name(), err.Error()) } bc.blockSize = uint64(defaultBlockSize) * _1MB if config.IsSet(compName + ".block-size-mb") { bc.blockSize = uint64(conf.BlockSize * float64(_1MB)) } if config.IsSet(compName + ".mem-size-mb") { bc.memSize = conf.MemSize * _1MB } else { bc.memSize = bc.getDefaultMemSize() } bc.diskTimeout = defaultTimeout if config.IsSet(compName + ".disk-timeout-sec") { bc.diskTimeout = conf.DiskTimeout } bc.consistency = conf.Consistency bc.prefetchOnOpen = conf.PrefetchOnOpen bc.prefetch = uint32(math.Max((MIN_PREFETCH*2)+1, (float64)(2*runtime.NumCPU()))) bc.noPrefetch = false if (!config.IsSet(compName + ".mem-size-mb")) && (uint64(bc.prefetch)*uint64(bc.blockSize)) > bc.memSize { bc.prefetch = (MIN_PREFETCH * 2) + 1 } err = config.UnmarshalKey("lazy-write", &bc.lazyWrite) if err != nil { log.Err("BlockCache: config error [unable to obtain lazy-write]") return fmt.Errorf("config error in %s [%s]", bc.Name(), err.Error()) } if config.IsSet(compName + ".prefetch") { bc.prefetch = conf.PrefetchCount if bc.prefetch == 0 { bc.noPrefetch = true } else if conf.PrefetchCount <= (MIN_PREFETCH * 2) { log.Err("BlockCache::Configure : Prefetch count can not be less then %v", (MIN_PREFETCH*2)+1) return fmt.Errorf("config error in %s [invalid prefetch count]", bc.Name()) } } bc.maxDiskUsageHit = false bc.workers = uint32(3 * runtime.NumCPU()) if config.IsSet(compName + ".parallelism") { bc.workers = conf.Workers } bc.tmpPath = common.ExpandPath(conf.TmpPath) bc.cleanupOnStart = conf.CleanupOnStart if bc.tmpPath != "" { //check mnt path is not same as temp path err = config.UnmarshalKey("mount-path", &bc.mntPath) if err != nil { log.Err("BlockCache: config error [unable to obtain Mount Path]") return fmt.Errorf("config error in %s [%s]", bc.Name(), err.Error()) } if bc.mntPath == bc.tmpPath { log.Err("BlockCache: config error [tmp-path is same as mount path]") return fmt.Errorf("config error in %s error [tmp-path is same as mount path]", bc.Name()) } // Extract values from 'conf' and store them as you wish here _, err = os.Stat(bc.tmpPath) if os.IsNotExist(err) { log.Info("BlockCache: config error [tmp-path does not exist. attempting to create tmp-path.]") err := os.Mkdir(bc.tmpPath, os.FileMode(0755)) if err != nil { log.Err("BlockCache: config error creating directory of temp path after clean [%s]", err.Error()) return fmt.Errorf("config error in %s [%s]", bc.Name(), err.Error()) } } else { if bc.cleanupOnStart { err := common.TempCacheCleanup(bc.tmpPath) if err != nil { return fmt.Errorf("error in %s error [fail to cleanup temp cache]", bc.Name()) } } } if !common.IsDirectoryEmpty(bc.tmpPath) { log.Err("BlockCache: config error %s directory is not empty", bc.tmpPath) return fmt.Errorf("config error in %s [%s]", bc.Name(), "temp directory not empty") } bc.diskSize = bc.getDefaultDiskSize(bc.tmpPath) if config.IsSet(compName + ".disk-size-mb") { bc.diskSize = conf.DiskSize * _1MB } } if (uint64(bc.prefetch) * uint64(bc.blockSize)) > bc.memSize { log.Err("BlockCache::Configure : config error [memory limit too low for configured prefetch]") return fmt.Errorf("config error in %s [memory limit too low for configured prefetch]", bc.Name()) } if bc.tmpPath != "" { bc.diskPolicy, err = tlru.New(uint32((bc.diskSize)/bc.blockSize), bc.diskTimeout, bc.diskEvict, 60, bc.checkDiskUsage) if err != nil { log.Err("BlockCache::Configure : fail to create LRU for memory nodes [%s]", err.Error()) return fmt.Errorf("config error in %s [%s]", bc.Name(), err.Error()) } } log.Crit("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v, consistency %v", bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch, bc.consistency) return nil } func (bc *BlockCache) getDefaultDiskSize(path string) uint64 { var stat syscall.Statfs_t err := syscall.Statfs(path, &stat) if err != nil { log.Info("BlockCache::getDefaultDiskSize : config error %s [%s]. Assigning a default value of 4GB or if any value is assigned to .disk-size-mb in config.", bc.Name(), err.Error()) return uint64(4192) * _1MB } return uint64(0.8 * float64(stat.Bavail) * float64(stat.Bsize)) } func (bc *BlockCache) getDefaultMemSize() uint64 { var sysinfo syscall.Sysinfo_t err := syscall.Sysinfo(&sysinfo) if err != nil { log.Info("BlockCache::getDefaultMemSize : config error %s [%s]. Assigning a pre-defined value of 4GB.", bc.Name(), err.Error()) return uint64(4192) * _1MB } return uint64(0.8 * (float64)(sysinfo.Freeram) * float64(sysinfo.Unit)) } // CreateFile: Create a new file func (bc *BlockCache) CreateFile(options internal.CreateFileOptions) (*handlemap.Handle, error) { log.Trace("BlockCache::CreateFile : name=%s, mode=%d", options.Name, options.Mode) _, err := bc.NextComponent().CreateFile(options) if err != nil { log.Err("BlockCache::CreateFile : Failed to create file %s", options.Name) return nil, err } handle := handlemap.NewHandle(options.Name) handle.Size = 0 handle.Mtime = time.Now() // As file is created on storage as well there is no need to mark this as dirty // Any write operation to file will mark it dirty and flush will then reupload // handle.Flags.Set(handlemap.HandleFlagDirty) bc.prepareHandleForBlockCache(handle) return handle, nil } // OpenFile: Create a handle for the file user has requested to open func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Handle, error) { log.Trace("BlockCache::OpenFile : name=%s, flags=%X, mode=%s", options.Name, options.Flags, options.Mode) attr, err := bc.NextComponent().GetAttr(internal.GetAttrOptions{Name: options.Name}) if err != nil { log.Err("BlockCache::OpenFile : Failed to get attr of %s [%s]", options.Name, err.Error()) return nil, err } handle := handlemap.NewHandle(options.Name) handle.Mtime = attr.Mtime handle.Size = attr.Size if attr.ETag != "" { handle.SetValue("ETAG", attr.ETag) } log.Debug("BlockCache::OpenFile : Size of file handle.Size %v", handle.Size) bc.prepareHandleForBlockCache(handle) if options.Flags&os.O_TRUNC != 0 { // If file is opened in truncate or wronly mode then we need to wipe out the data consider current file size as 0 log.Debug("BlockCache::OpenFile : Truncate %v to 0", options.Name) handle.Size = 0 handle.Flags.Set(handlemap.HandleFlagDirty) } else if handle.Size != 0 && (options.Flags&os.O_RDWR != 0 || options.Flags&os.O_APPEND != 0) { // File is not opened in read-only mode so we need to get the list of blocks and validate the size // As there can be a potential write on this file, currently configured block size and block size of the file in container // has to match otherwise it will corrupt the file. Fail the open call if this is not the case. blockList, err := bc.NextComponent().GetCommittedBlockList(options.Name) if err != nil || blockList == nil { log.Err("BlockCache::OpenFile : Failed to get block list of %s [%v]", options.Name, err) return nil, fmt.Errorf("failed to retrieve block list for %s", options.Name) } valid := bc.validateBlockList(handle, options, blockList) if !valid { return nil, fmt.Errorf("block size mismatch for %s", options.Name) } } if handle.Size > 0 { // This shall be done after the refresh only as this will populate the queues created by above method if handle.Size < int64(bc.blockSize) { // File is small and can fit in one block itself _ = bc.refreshBlock(handle, 0, false) } else if bc.prefetchOnOpen && !bc.noPrefetch { // Prefetch to start on open _ = bc.startPrefetch(handle, 0, false) } } return handle, nil } // validateBlockList: Validates the blockList and populates the blocklist inside the handle for a file. // This method is only called when the file is opened in O_RDWR mode. // Each Block's size must equal to blockSize set in config and last block size <= config's blockSize // returns true, if blockList is valid func (bc *BlockCache) validateBlockList(handle *handlemap.Handle, options internal.OpenFileOptions, blockList *internal.CommittedBlockList) bool { lst, _ := handle.GetValue("blockList") listMap := lst.(map[int64]*blockInfo) listLen := len(*blockList) for idx, block := range *blockList { if (idx < (listLen-1) && block.Size != bc.blockSize) || (idx == (listLen-1) && block.Size > bc.blockSize) { log.Err("BlockCache::validateBlockList : Block size mismatch for %s [block: %v, size: %v]", options.Name, block.Id, block.Size) return false } listMap[int64(idx)] = &blockInfo{ id: block.Id, committed: true, size: block.Size, } } return true } func (bc *BlockCache) prepareHandleForBlockCache(handle *handlemap.Handle) { // Allocate a block pool object for this handle // Actual linked list to hold the nodes handle.Buffers = &handlemap.Buffers{ Cooked: list.New(), // List to hold free blocks Cooking: list.New(), // List to hold blocks still under download } // Create map to hold the block-ids for this file listMap := make(map[int64]*blockInfo, 0) handle.SetValue("blockList", listMap) // Set next offset to download as 0 // We may not download this if first read starts with some other offset handle.SetValue("#", (uint64)(0)) } // FlushFile: Flush the local file to storage func (bc *BlockCache) FlushFile(options internal.FlushFileOptions) error { log.Trace("BlockCache::FlushFile : handle=%d, path=%s", options.Handle.ID, options.Handle.Path) if bc.lazyWrite && !options.CloseInProgress { // As lazy-write is enable, upload will be scheduled when file is closed. log.Info("BlockCache::FlushFile : %s will be flushed when handle %d is closed", options.Handle.Path, options.Handle.ID) return nil } options.Handle.Lock() defer options.Handle.Unlock() // call commit blocks only if the handle is dirty if options.Handle.Dirty() { err := bc.commitBlocks(options.Handle) if err != nil { log.Err("BlockCache::FlushFile : Failed to commit blocks for %s [%s]", options.Handle.Path, err.Error()) return err } } return nil } // CloseFile: File is closed by application so release all the blocks and submit back to blockPool func (bc *BlockCache) CloseFile(options internal.CloseFileOptions) error { bc.fileCloseOpt.Add(1) if !bc.lazyWrite { // Sync close is called so wait till the upload completes return bc.closeFileInternal(options) } // Async close is called so schedule the upload and return here go bc.closeFileInternal(options) //nolint return nil } // closeFileInternal: Actual handling of the close file goes here func (bc *BlockCache) closeFileInternal(options internal.CloseFileOptions) error { log.Trace("BlockCache::CloseFile : name=%s, handle=%d", options.Handle.Path, options.Handle.ID) defer bc.fileCloseOpt.Done() if options.Handle.Dirty() { log.Info("BlockCache::CloseFile : name=%s, handle=%d dirty. Flushing the file.", options.Handle.Path, options.Handle.ID) err := bc.FlushFile(internal.FlushFileOptions{Handle: options.Handle, CloseInProgress: true}) //nolint if err != nil { log.Err("BlockCache::CloseFile : failed to flush file %s", options.Handle.Path) return err } } // Release the blocks that are in use and wipe out handle map options.Handle.Cleanup() // Release the buffers which are still under download after they have been written blockList := options.Handle.Buffers.Cooking node := blockList.Front() for ; node != nil; node = blockList.Front() { // Due to prefetch there might be some downloads still going on block := blockList.Remove(node).(*Block) // Wait for download to complete and then free up this block <-block.state block.node = nil block.ReUse() bc.blockPool.Release(block) } options.Handle.Buffers.Cooking = nil // Release the blocks that are ready to be reused blockList = options.Handle.Buffers.Cooked node = blockList.Front() for ; node != nil; node = blockList.Front() { block := blockList.Remove(node).(*Block) // block.Unblock() block.node = nil block.ReUse() bc.blockPool.Release(block) } options.Handle.Buffers.Cooked = nil return nil } func (bc *BlockCache) getBlockSize(fileSize uint64, block *Block) uint64 { return min(bc.blockSize, fileSize-block.offset) } // ReadInBuffer: Read the file into a buffer func (bc *BlockCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, error) { if options.Offset >= options.Handle.Size { // EOF reached so early exit return 0, io.EOF } // As of now we allow only one operation on a handle at a time // This simplifies the logic of block-cache otherwise we will have to handle // a lot of race conditions and logic becomes complex and sub-performant options.Handle.Lock() defer options.Handle.Unlock() // Keep getting next blocks until you read the request amount of data dataRead := int(0) for dataRead < len(options.Data) { block, err := bc.getBlock(options.Handle, uint64(options.Offset)) if err != nil { if err != io.EOF { log.Err("BlockCache::ReadInBuffer : Failed to get Block %v=>%s offset %v [%v]", options.Handle.ID, options.Handle.Path, options.Offset, err.Error()) } return dataRead, err } // Copy data from this block to user buffer readOffset := uint64(options.Offset) - block.offset blockSize := bc.getBlockSize(uint64(options.Handle.Size), block) bytesRead := copy(options.Data[dataRead:], block.data[readOffset:blockSize]) // Move offset forward in case we need to copy more data options.Offset += int64(bytesRead) dataRead += bytesRead if options.Offset >= options.Handle.Size { // EOF reached so early exit return dataRead, io.EOF } } return dataRead, nil } func (bc *BlockCache) addToCooked(handle *handlemap.Handle, block *Block) { if block.node != nil { _ = handle.Buffers.Cooking.Remove(block.node) _ = handle.Buffers.Cooked.Remove(block.node) } block.node = handle.Buffers.Cooked.PushBack(block) } func (bc *BlockCache) addToCooking(handle *handlemap.Handle, block *Block) { if block.node != nil { _ = handle.Buffers.Cooked.Remove(block.node) _ = handle.Buffers.Cooking.Remove(block.node) } block.node = handle.Buffers.Cooking.PushBack(block) } // getBlock: From offset generate the Block index and get the Block corresponding to it /* Base logic of getBlock: Check if the given block is already available or not if not if this is the first read for this file start prefetching of blocks from given offset if this is not first read, consider this to be a random read case and start prefetch from given offset once the random read count reaches a limit, this prefetching will be turned off in either case this prefetching will add the block index to the map so search the map again now Once block is available if you are first reader of this block its time to prefetch next block(s) based on how much we can prefetch Once you queue up the required prefetch mark this block as open to read so that others can come and freely read this block First reader here has responsibility to remove an old used block and lineup download for next blocks Return this block once prefetch is queued and block is marked open for all */ func (bc *BlockCache) getBlock(handle *handlemap.Handle, readoffset uint64) (*Block, error) { if readoffset >= uint64(handle.Size) { return nil, io.EOF } // Check the given block index is already available or not index := bc.getBlockIndex(readoffset) node, found := handle.GetValue(fmt.Sprintf("%v", index)) if !found { // block is not present in the buffer list, check if it is uncommitted // If yes, commit all the uncommitted blocks first and then download this block shouldCommit, shouldDownload := shouldCommitAndDownload(int64(index), handle) if shouldCommit { // commit all the uncommitted blocks to storage log.Debug("BlockCache::getBlock : Downloading an uncommitted block %v, so committing all the staged blocks for %v=>%s", index, handle.ID, handle.Path) err := bc.commitBlocks(handle) if err != nil { log.Err("BlockCache::getBlock : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error()) return nil, err } } else if !shouldCommit && !shouldDownload { prop, err := bc.GetAttr(internal.GetAttrOptions{Name: handle.Path, RetrieveMetadata: false}) //if failed to get attr if err != nil { log.Err("BlockCache::getBlock : Failed to get properties for %v=>%s [%s]", handle.ID, handle.Path, err.Error()) return nil, err } if readoffset >= uint64(prop.Size) { //create a null block and return block := bc.blockPool.MustGet() block.offset = readoffset // block.flags.Set(BlockFlagSynced) log.Debug("BlockCache::getBlock : Returning a null block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset) return block, nil } } // If this is the first read request then prefetch all required nodes val, _ := handle.GetValue("#") if !bc.noPrefetch && val.(uint64) == 0 { log.Debug("BlockCache::getBlock : Starting the prefetch %v=>%s (offset %v, index %v)", handle.ID, handle.Path, readoffset, index) // This is the first read for this file handle so start prefetching all the nodes err := bc.startPrefetch(handle, index, false) if err != nil && err != io.EOF { log.Err("BlockCache::getBlock : Unable to start prefetch %v=>%s (offset %v, index %v) [%s]", handle.ID, handle.Path, readoffset, index, err.Error()) return nil, err } } else { // This is a case of random read so increment the random read count handle.OptCnt++ log.Debug("BlockCache::getBlock : Unable to get block %v=>%s (offset %v, index %v) Random %v", handle.ID, handle.Path, readoffset, index, handle.OptCnt) // This block is not present even after prefetch so lets download it now err := bc.startPrefetch(handle, index, false) if err != nil && err != io.EOF { log.Err("BlockCache::getBlock : Unable to start prefetch %v=>%s (offset %v, index %v) [%s]", handle.ID, handle.Path, readoffset, index, err.Error()) return nil, err } } // This node was not found so above logic should have queued it up, retry searching now node, found = handle.GetValue(fmt.Sprintf("%v", index)) if !found { log.Err("BlockCache::getBlock : Failed to get the required block %v=>%s (offset %v, index %v)", handle.ID, handle.Path, readoffset, index) return nil, fmt.Errorf("not able to find block immediately after scheudling") } } // We have the block now which we wish to read block := node.(*Block) // Wait for this block to complete the download t, ok := <-block.state if ok { // this block is now open to read and process block.Unblock() switch t { case BlockStatusDownloaded: log.Debug("BlockCache::getBlock : Downloaded block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset) block.flags.Clear(BlockFlagDownloading) // Download complete and you are first reader of this block if !bc.noPrefetch && handle.OptCnt <= MIN_RANDREAD { // So far this file has been read sequentially so prefetch more val, _ := handle.GetValue("#") if int64(val.(uint64)*bc.blockSize) < handle.Size { _ = bc.startPrefetch(handle, val.(uint64), true) } } // This block was moved to in-process queue as download is complete lets move it back to normal queue bc.addToCooked(handle, block) // mark this block as synced so that if it can used for write later // which will move it back to cooking list as per the synced flag block.flags.Set(BlockFlagSynced) case BlockStatusUploaded: log.Debug("BlockCache::getBlock : Staged block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset) block.flags.Clear(BlockFlagUploading) case BlockStatusDownloadFailed: log.Err("BlockCache::getBlock : Failed to download block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset) // Remove this node from handle so that next read retries to download the block again bc.releaseDownloadFailedBlock(handle, block) return nil, fmt.Errorf("failed to download block") case BlockStatusUploadFailed: // Local data is still valid so continue using this buffer log.Err("BlockCache::getBlock : Failed to upload block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset) block.flags.Clear(BlockFlagUploading) // Move this block to end of queue as this is still modified and un-staged bc.addToCooking(handle, block) } } return block, nil } // getBlockIndex: From offset get the block index func (bc *BlockCache) getBlockIndex(offset uint64) uint64 { return offset / bc.blockSize } // startPrefetch: Start prefetchign the blocks from given offset. Same method is used to download currently required block as well func (bc *BlockCache) startPrefetch(handle *handlemap.Handle, index uint64, prefetch bool) error { // Calculate how many buffers we have in free and in-process queue currentCnt := handle.Buffers.Cooked.Len() + handle.Buffers.Cooking.Len() cnt := uint32(0) if handle.OptCnt > MIN_RANDREAD { // This handle has been read randomly and we have reached the threshold to declare a random read case if currentCnt > MIN_PREFETCH { // As this file is in random read mode now, release the excess buffers. Just keep 5 buffers for it to work log.Info("BlockCache::startPrefetch : Cleanup excessive blocks %v=>%s index %v", handle.ID, handle.Path, index) // As this is random read move all in process blocks to free list nodeList := handle.Buffers.Cooking currentCnt = nodeList.Len() node := nodeList.Front() for i := 0; node != nil && i < currentCnt; node = nodeList.Front() { // Test whether this block is already downloaded or still under download block := handle.Buffers.Cooking.Remove(node).(*Block) block.node = nil i++ //This list may contain dirty blocks which are yet to be committed. select { case _, ok := <-block.state: // As we are first reader of this block here its important to unblock any future readers on this block if ok { block.flags.Clear(BlockFlagDownloading) block.Unblock() // Block is downloaded so it's safe to ready it for reuse block.node = handle.Buffers.Cooked.PushBack(block) } else { block.node = handle.Buffers.Cooking.PushBack(block) } default: // Block is still under download so can not reuse this block.node = handle.Buffers.Cooking.PushBack(block) } } // Now remove excess blocks from cooked list nodeList = handle.Buffers.Cooked currentCnt = nodeList.Len() node = nodeList.Front() for ; node != nil && currentCnt > MIN_PREFETCH; node = nodeList.Front() { block := node.Value.(*Block) _ = nodeList.Remove(node) // Remove entry of this block from map so that no one can find it handle.RemoveValue(fmt.Sprintf("%v", block.id)) block.node = nil // Submit this block back to pool for reuse block.ReUse() bc.blockPool.Release(block) currentCnt-- } } // As we were asked to download a block, for random read case download only the requested block // This is where prefetching is blocked now as we download just the block which is requested cnt = 1 } else { // This handle is having sequential reads so far // Allocate more buffers if required until we hit the prefetch count limit for ; currentCnt < int(bc.prefetch) && cnt < MIN_PREFETCH; currentCnt++ { block := bc.blockPool.TryGet() if block != nil { block.node = handle.Buffers.Cooked.PushFront(block) cnt++ } } // If no new buffers were allocated then we have all buffers allocated to this handle already // time to switch to a sliding window where we remove one block and lineup a new block for download if cnt == 0 { cnt = 1 } } for i := uint32(0); i < cnt; i++ { // Check if the block exists in the local cache or not // If not, download the block from storage _, found := handle.GetValue(fmt.Sprintf("%v", index)) if !found { // Check if the block is an uncommitted block or not // For uncommitted block we need to commit the block first shouldCommit, _ := shouldCommitAndDownload(int64(index), handle) if shouldCommit { // This shall happen only for the first uncommitted block and shall flush all the uncommitted blocks to storage log.Debug("BlockCache::startPrefetch : Fetching an uncommitted block %v, so committing all the staged blocks for %v=>%s", index, handle.ID, handle.Path) err := bc.commitBlocks(handle) if err != nil { log.Err("BlockCache::startPrefetch : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error()) return err } } // push the block for download err := bc.refreshBlock(handle, index, prefetch || i > 0) if err != nil { return err } index++ } } return nil } // refreshBlock: Get a block from the list and prepare it for download func (bc *BlockCache) refreshBlock(handle *handlemap.Handle, index uint64, prefetch bool) error { log.Trace("BlockCache::refreshBlock : Request to download %v=>%s (index %v, prefetch %v)", handle.ID, handle.Path, index, prefetch) // Convert index to offset offset := index * bc.blockSize if int64(offset) >= handle.Size { // We have reached EOF so return back no need to download anything here return io.EOF } nodeList := handle.Buffers.Cooked if nodeList.Len() == 0 && !prefetch { // User needs a block now but there is no free block available right now // this might happen when all blocks are under download and no first reader is hit for any of them block := bc.blockPool.MustGet() if block == nil { log.Err("BlockCache::refreshBlock : Unable to allocate block %v=>%s (index %v, prefetch %v)", handle.ID, handle.Path, index, prefetch) return fmt.Errorf("unable to allocate block") } block.node = handle.Buffers.Cooked.PushFront(block) } node := nodeList.Front() if node != nil { // Now there is at least one free block available in the list block := node.Value.(*Block) if block.id != -1 { // If the block is being staged, then wait till it is uploaded // and then use it for read if block.flags.IsSet(BlockFlagUploading) { log.Debug("BlockCache::refreshBlock : Waiting for the block %v to upload before using it for block %v read for %v=>%s", block.id, index, handle.ID, handle.Path) _, ok := <-block.state if ok { block.Unblock() } } // This is a reuse of a block case so we need to remove old entry from the map handle.RemoveValue(fmt.Sprintf("%v", block.id)) } // Reuse this block and lineup for download block.ReUse() block.id = int64(index) block.offset = offset // Add this entry to handle map so that others can refer to the same block if required handle.SetValue(fmt.Sprintf("%v", index), block) handle.SetValue("#", (index + 1)) bc.lineupDownload(handle, block, prefetch) } return nil } // lineupDownload : Create a work item and schedule the download func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, prefetch bool) { IEtag, found := handle.GetValue("ETAG") Etag := "" if found { Etag = IEtag.(string) } item := &workItem{ handle: handle, block: block, prefetch: prefetch, failCnt: 0, upload: false, ETag: Etag, } // Remove this block from free block list and add to in-process list bc.addToCooking(handle, block) block.flags.Set(BlockFlagDownloading) // Send the work item to worker pool to schedule download bc.threadPool.Schedule(!prefetch, item) } // download : Method to download the given amount of data func (bc *BlockCache) download(item *workItem) { fileName := fmt.Sprintf("%s::%v", item.handle.Path, item.block.id) // filename_blockindex is the key for the lock // this ensure that at a given time a block from a file is downloaded only once across all open handles flock := bc.fileLocks.Get(fileName) flock.Lock() defer flock.Unlock() var diskNode any found := false localPath := "" if bc.tmpPath != "" { // Update diskpolicy to reflect the new file diskNode, found = bc.fileNodeMap.Load(fileName) if !found { diskNode = bc.diskPolicy.Add(fileName) bc.fileNodeMap.Store(fileName, diskNode) } else { bc.diskPolicy.Refresh(diskNode.(*list.Element)) } // Check local file exists for this offset and file combination or not localPath = filepath.Join(bc.tmpPath, fileName) _, err := os.Stat(localPath) if err == nil { // If file exists then read the block from the local file f, err := os.Open(localPath) if err != nil { // On any disk failure we do not fail the download flow log.Err("BlockCache::download : Failed to open file %s [%s]", fileName, err.Error()) _ = os.Remove(localPath) } else { var successfulRead bool = true numberOfBytes, err := f.Read(item.block.data) if err != nil { log.Err("BlockCache::download : Failed to read data from disk cache %s [%s]", fileName, err.Error()) successfulRead = false _ = os.Remove(localPath) } if numberOfBytes != int(bc.blockSize) && item.block.offset+uint64(numberOfBytes) != uint64(item.handle.Size) { log.Err("BlockCache::download : Local data retrieved from disk size mismatch, Expected %v, OnDisk %v, fileSize %v", bc.getBlockSize(uint64(item.handle.Size), item.block), numberOfBytes, item.handle.Size) successfulRead = false _ = os.Remove(localPath) } f.Close() if successfulRead { // If user has enabled consistency check then compute the md5sum and match it in xattr successfulRead = checkBlockConsistency(bc, item, numberOfBytes, localPath, fileName) // We have read the data from disk so there is no need to go over network // Just mark the block that download is complete if successfulRead { item.block.Ready(BlockStatusDownloaded) return } } } } } var etag string // If file does not exists then download the block from the container n, err := bc.NextComponent().ReadInBuffer(internal.ReadInBufferOptions{ Handle: item.handle, Offset: int64(item.block.offset), Data: item.block.data, Etag: &etag, }) if item.failCnt > MAX_FAIL_CNT { // If we failed to read the data 3 times then just give up log.Err("BlockCache::download : 3 attempts to download a block have failed %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset) item.block.Failed() item.block.Ready(BlockStatusDownloadFailed) return } if err != nil && err != io.EOF { // Fail to read the data so just reschedule this request log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error()) item.failCnt++ bc.threadPool.Schedule(false, item) return } else if n == 0 { // No data read so just reschedule this request log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [0 bytes read]", item.handle.ID, item.handle.Path, item.block.id) item.failCnt++ bc.threadPool.Schedule(false, item) return } // Compare the ETAG value and fail download if blob has changed if etag != "" { if item.ETag != "" && item.ETag != etag { log.Err("BlockCache::download : Blob has changed for %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset) item.block.Failed() item.block.Ready(BlockStatusDownloadFailed) return } } if bc.tmpPath != "" { err := os.MkdirAll(filepath.Dir(localPath), 0777) if err != nil { log.Err("BlockCache::download : error creating directory structure for file %s [%s]", localPath, err.Error()) return } // Dump this block to local disk cache f, err := os.Create(localPath) if err == nil { _, err := f.Write(item.block.data[:n]) if err != nil { log.Err("BlockCache::download : Failed to write %s to disk [%v]", localPath, err.Error()) _ = os.Remove(localPath) } f.Close() bc.diskPolicy.Refresh(diskNode.(*list.Element)) // If user has enabled consistency check then compute the md5sum and save it in xattr if bc.consistency { hash := common.GetCRC64(item.block.data, n) err = syscall.Setxattr(localPath, "user.md5sum", hash, 0) if err != nil { log.Err("BlockCache::download : Failed to set md5sum for file %s [%v]", localPath, err.Error()) } } } } // Just mark the block that download is complete item.block.Ready(BlockStatusDownloaded) } func checkBlockConsistency(blockCache *BlockCache, item *workItem, numberOfBytes int, localPath, fileName string) bool { if !blockCache.consistency { return true } // Calculate MD5 checksum of the read data actualHash := common.GetCRC64(item.block.data, numberOfBytes) // Retrieve MD5 checksum from xattr xattrHash := make([]byte, 8) _, err := syscall.Getxattr(localPath, "user.md5sum", xattrHash) if err != nil { log.Err("BlockCache::download : Failed to get md5sum for file %s [%v]", fileName, err.Error()) } else { // Compare checksums if !bytes.Equal(actualHash, xattrHash) { log.Err("BlockCache::download : MD5 checksum mismatch for file %s, expected %v, got %v", fileName, xattrHash, actualHash) _ = os.Remove(localPath) return false } } return true } // WriteFile: Write to the local file func (bc *BlockCache) WriteFile(options internal.WriteFileOptions) (int, error) { // log.Debug("BlockCache::WriteFile : Writing %v bytes from %s", len(options.Data), options.Handle.Path) options.Handle.Lock() defer options.Handle.Unlock() // log.Debug("BlockCache::WriteFile : Writing handle %v=>%v: offset %v, %v bytes", options.Handle.ID, options.Handle.Path, options.Offset, len(options.Data)) // Keep getting next blocks until you read the request amount of data dataWritten := int(0) for dataWritten < len(options.Data) { block, err := bc.getOrCreateBlock(options.Handle, uint64(options.Offset)) if err != nil { // Failed to get block for writing log.Err("BlockCache::WriteFile : Unable to allocate block for %s [%s]", options.Handle.Path, err.Error()) return dataWritten, err } // log.Debug("BlockCache::WriteFile : Writing to block %v, offset %v for handle %v=>%v", block.id, options.Offset, options.Handle.ID, options.Handle.Path) // Copy the incoming data to block writeOffset := uint64(options.Offset) - block.offset bytesWritten := copy(block.data[writeOffset:], options.Data[dataWritten:]) // Mark this block has been updated block.Dirty() options.Handle.Flags.Set(handlemap.HandleFlagDirty) // Move offset forward in case we need to copy more data options.Offset += int64(bytesWritten) dataWritten += bytesWritten if options.Handle.Size < options.Offset { options.Handle.Size = options.Offset } } return dataWritten, nil } func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) (*Block, error) { // Check the given block index is already available or not index := bc.getBlockIndex(offset) if index >= MAX_BLOCKS { log.Err("BlockCache::getOrCreateBlock : Failed to get Block %v=>%s offset %v", handle.ID, handle.Path, offset) return nil, fmt.Errorf("block index out of range. Increase your block size") } // log.Debug("FilBlockCacheCache::getOrCreateBlock : Get block for %s, index %v", handle.Path, index) var block *Block var err error node, found := handle.GetValue(fmt.Sprintf("%v", index)) if !found { // If too many buffers are piled up for this file then try to evict some of those which are already uploaded if handle.Buffers.Cooked.Len()+handle.Buffers.Cooking.Len() >= int(bc.prefetch) { bc.waitAndFreeUploadedBlocks(handle, 1) } // Either the block is not fetched yet or offset goes beyond the file size block = bc.blockPool.MustGet() if block == nil { log.Err("BlockCache::getOrCreateBlock : Unable to allocate block %v=>%s (index %v)", handle.ID, handle.Path, index) return nil, fmt.Errorf("unable to allocate block") } block.node = nil block.id = int64(index) block.offset = index * bc.blockSize if block.offset < uint64(handle.Size) { shouldCommit, shouldDownload := shouldCommitAndDownload(block.id, handle) // if a block has been staged and deleted from the buffer list, then we should commit the existing blocks // commit the dirty blocks and download the given block if shouldCommit { log.Debug("BlockCache::getOrCreateBlock : Fetching an uncommitted block %v, so committing all the staged blocks for %v=>%s", block.id, handle.ID, handle.Path) err = bc.commitBlocks(handle) if err != nil { log.Err("BlockCache::getOrCreateBlock : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error()) return nil, err } } // download the block if, // - it was already committed, or // - it was committed by the above commit blocks operation if shouldDownload || shouldCommit { // We are writing somewhere in between so just fetch this block log.Debug("BlockCache::getOrCreateBlock : Downloading block %v for %v=>%v", block.id, handle.ID, handle.Path) bc.lineupDownload(handle, block, false) // Now wait for download to complete <-block.state // if the block failed to download, it can't be used for overwriting if block.IsFailed() { log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path) // Remove this node from handle so that next read retries to download the block again bc.releaseDownloadFailedBlock(handle, block) return nil, fmt.Errorf("failed to download block") } } else { log.Debug("BlockCache::getOrCreateBlock : push block %v to the cooking list for %v=>%v", block.id, handle.ID, handle.Path) block.node = handle.Buffers.Cooking.PushBack(block) } } else { block.node = handle.Buffers.Cooking.PushBack(block) } handle.SetValue(fmt.Sprintf("%v", index), block) block.flags.Clear(BlockFlagDownloading) block.Unblock() // As we are creating new blocks here, we need to push the block for upload and remove them from list here if handle.Buffers.Cooking.Len() > MIN_WRITE_BLOCK { err = bc.stageBlocks(handle, 1) if err != nil { log.Err("BlockCache::getOrCreateBlock : Unable to stage blocks for %s [%s]", handle.Path, err.Error()) } } } else { // We have the block now which we wish to write block = node.(*Block) // If the block was staged earlier then we are overwriting it here so move it back to cooking queue if block.flags.IsSet(BlockFlagSynced) { log.Debug("BlockCache::getOrCreateBlock : Overwriting back to staged block %v for %v=>%s", block.id, handle.ID, handle.Path) } else if block.flags.IsSet(BlockFlagDownloading) { log.Debug("BlockCache::getOrCreateBlock : Waiting for download to finish for committed block %v for %v=>%s", block.id, handle.ID, handle.Path) _, ok := <-block.state if ok { block.Unblock() } // if the block failed to download, it can't be used for overwriting if block.IsFailed() { log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path) // Remove this node from handle so that next read retries to download the block again bc.releaseDownloadFailedBlock(handle, block) return nil, fmt.Errorf("failed to download block") } } else if block.flags.IsSet(BlockFlagUploading) { // If the block is being staged, then wait till it is uploaded, // and then write to the same block and move it back to cooking queue log.Debug("BlockCache::getOrCreateBlock : Waiting for the block %v to upload for %v=>%s", block.id, handle.ID, handle.Path) _, ok := <-block.state if ok { block.Unblock() } } bc.addToCooking(handle, block) block.flags.Clear(BlockFlagUploading) block.flags.Clear(BlockFlagDownloading) block.flags.Clear(BlockFlagSynced) } return block, nil } // Stage the given number of blocks from this handle func (bc *BlockCache) stageBlocks(handle *handlemap.Handle, cnt int) error { //log.Debug("BlockCache::stageBlocks : Staging blocks for %s, cnt %v", handle.Path, cnt) nodeList := handle.Buffers.Cooking node := nodeList.Front() lst, _ := handle.GetValue("blockList") listMap := lst.(map[int64]*blockInfo) for node != nil && cnt > 0 { nextNode := node.Next() block := node.Value.(*Block) if block.IsDirty() { bc.lineupUpload(handle, block, listMap) cnt-- } node = nextNode } return nil } // remove the block which failed to download so that it can be used again func (bc *BlockCache) releaseDownloadFailedBlock(handle *handlemap.Handle, block *Block) { if block.node != nil { _ = handle.Buffers.Cooking.Remove(block.node) _ = handle.Buffers.Cooked.Remove(block.node) } handle.RemoveValue(fmt.Sprintf("%v", block.id)) block.node = nil block.ReUse() bc.blockPool.Release(block) } func (bc *BlockCache) printCooking(handle *handlemap.Handle) { //nolint nodeList := handle.Buffers.Cooking node := nodeList.Front() cookedId := []int64{} cookingId := []int64{} for node != nil { nextNode := node.Next() block := node.Value.(*Block) cookingId = append(cookingId, block.id) node = nextNode } nodeList = handle.Buffers.Cooked node = nodeList.Front() for node != nil { nextNode := node.Next() block := node.Value.(*Block) cookedId = append(cookedId, block.id) node = nextNode } log.Debug("BlockCache::printCookingnCooked : %v=>%s \n Cooking: [%v] \n Cooked: [%v]", handle.ID, handle.Path, cookingId, cookedId) } // shouldCommitAndDownload is used to check if we should commit the existing blocks and download the given block. // There can be a case where a block has been partially written, staged and cleared from the buffer list. // If write call comes for that block, we cannot get the previous staged data // since the block is not yet committed. So, we have to commit it. // If the block is staged and cleared from the buffer list, return true for commit and false for downloading. // if the block is already committed, return false for commit and true for downloading. func shouldCommitAndDownload(blockID int64, handle *handlemap.Handle) (bool, bool) { lst, ok := handle.GetValue("blockList") if !ok { return false, false } listMap := lst.(map[int64]*blockInfo) val, ok := listMap[blockID] if ok { // block id exists // If block is staged, return true for commit and false for downloading // If block is committed, return false for commit and true for downloading return !val.committed, val.committed } else { return false, false } } // lineupUpload : Create a work item and schedule the upload func (bc *BlockCache) lineupUpload(handle *handlemap.Handle, block *Block, listMap map[int64]*blockInfo) { id := common.GetBlockID(common.BlockIDLength) listMap[block.id] = &blockInfo{ id: id, committed: false, size: bc.getBlockSize(uint64(handle.Size), block), } log.Debug("BlockCache::lineupUpload : block %v, size %v for %v=>%s, blockId %v", block.id, bc.getBlockSize(uint64(handle.Size), block), handle.ID, handle.Path, id) item := &workItem{ handle: handle, block: block, prefetch: false, failCnt: 0, upload: true, blockId: id, } block.Uploading() block.flags.Clear(BlockFlagFailed) block.flags.Set(BlockFlagUploading) // Remove this block from free block list and add to in-process list bc.addToCooked(handle, block) // Send the work item to worker pool to schedule download bc.threadPool.Schedule(false, item) } func (bc *BlockCache) waitAndFreeUploadedBlocks(handle *handlemap.Handle, cnt int) { nodeList := handle.Buffers.Cooked node := nodeList.Front() nextNode := node wipeoutBlock := false if cnt == 1 { wipeoutBlock = true } for nextNode != nil && cnt > 0 { node = nextNode nextNode = node.Next() block := node.Value.(*Block) if block.id != -1 { // Wait for upload of this block to complete _, ok := <-block.state block.flags.Clear(BlockFlagDownloading) block.flags.Clear(BlockFlagUploading) if ok { block.Unblock() } } else { block.Unblock() } if block.IsFailed() { log.Err("BlockCache::waitAndFreeUploadedBlocks : Failed to upload block, posting back to cooking list %v=>%s (index %v, offset %v)", handle.ID, handle.Path, block.id, block.offset) bc.addToCooking(handle, block) continue } cnt-- if wipeoutBlock || block.id == -1 { log.Debug("BlockCache::waitAndFreeUploadedBlocks : Block cleanup for block %v=>%s (index %v, offset %v)", handle.ID, handle.Path, block.id, block.offset) handle.RemoveValue(fmt.Sprintf("%v", block.id)) nodeList.Remove(node) block.node = nil block.ReUse() bc.blockPool.Release(block) } } } // upload : Method to stage the given amount of data func (bc *BlockCache) upload(item *workItem) { fileName := fmt.Sprintf("%s::%v", item.handle.Path, item.block.id) // filename_blockindex is the key for the lock // this ensure that at a given time a block from a file is downloaded only once across all open handles flock := bc.fileLocks.Get(fileName) flock.Lock() defer flock.Unlock() blockSize := bc.getBlockSize(uint64(item.handle.Size), item.block) // This block is updated so we need to stage it now err := bc.NextComponent().StageData(internal.StageDataOptions{ Name: item.handle.Path, Data: item.block.data[0:blockSize], Offset: uint64(item.block.offset), Id: item.blockId}) if err != nil { // Fail to write the data so just reschedule this request log.Err("BlockCache::upload : Failed to write %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error()) item.failCnt++ if item.failCnt > MAX_FAIL_CNT { // If we failed to write the data 3 times then just give up log.Err("BlockCache::upload : 3 attempts to upload a block have failed %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset) item.block.Failed() item.block.Ready(BlockStatusUploadFailed) return } bc.threadPool.Schedule(false, item) return } if bc.tmpPath != "" { localPath := filepath.Join(bc.tmpPath, fileName) err := os.MkdirAll(filepath.Dir(localPath), 0777) if err != nil { log.Err("BlockCache::upload : error creating directory structure for file %s [%s]", localPath, err.Error()) goto return_safe } // Dump this block to local disk cache f, err := os.Create(localPath) if err == nil { _, err := f.Write(item.block.data[0:blockSize]) if err != nil { log.Err("BlockCache::upload : Failed to write %s to disk [%v]", localPath, err.Error()) _ = os.Remove(localPath) goto return_safe } f.Close() diskNode, found := bc.fileNodeMap.Load(fileName) if !found { diskNode = bc.diskPolicy.Add(fileName) bc.fileNodeMap.Store(fileName, diskNode) } else { bc.diskPolicy.Refresh(diskNode.(*list.Element)) } // If user has enabled consistency check then compute the md5sum and save it in xattr if bc.consistency { hash := common.GetCRC64(item.block.data, int(blockSize)) err = syscall.Setxattr(localPath, "user.md5sum", hash, 0) if err != nil { log.Err("BlockCache::download : Failed to set md5sum for file %s [%v]", localPath, err.Error()) } } } } return_safe: item.block.flags.Set(BlockFlagSynced) item.block.NoMoreDirty() item.block.Ready(BlockStatusUploaded) } // Stage the given number of blocks from this handle // handle lock must be taken before calling this function func (bc *BlockCache) commitBlocks(handle *handlemap.Handle) error { log.Debug("BlockCache::commitBlocks : Staging blocks for %s", handle.Path) // Make three attempts to upload all pending blocks cnt := 0 for cnt = 0; cnt < 3; cnt++ { if handle.Buffers.Cooking.Len() == 0 { break } err := bc.stageBlocks(handle, MAX_BLOCKS) if err != nil { log.Err("BlockCache::commitBlocks : Failed to stage blocks for %s [%s]", handle.Path, err.Error()) return err } bc.waitAndFreeUploadedBlocks(handle, MAX_BLOCKS) } if cnt == 3 { nodeList := handle.Buffers.Cooking node := nodeList.Front() for node != nil { block := node.Value.(*Block) node = node.Next() if block.IsDirty() { log.Err("BlockCache::commitBlocks : Failed to stage blocks for %s after 3 attempts", handle.Path) return fmt.Errorf("failed to stage blocks") } } } blockIDList, restageIds, err := bc.getBlockIDList(handle) if err != nil { log.Err("BlockCache::commitBlocks : Failed to get block id list for %v [%v]", handle.Path, err.Error()) return err } log.Debug("BlockCache::commitBlocks : Committing blocks for %s", handle.Path) // Commit the block list now var newEtag string = "" err = bc.NextComponent().CommitData(internal.CommitDataOptions{Name: handle.Path, List: blockIDList, BlockSize: bc.blockSize, NewETag: &newEtag}) if err != nil { log.Err("BlockCache::commitBlocks : Failed to commit blocks for %s [%s]", handle.Path, err.Error()) return err } // Lock was already acquired on the handle. if newEtag != "" { handle.SetValue("ETAG", newEtag) } // set all the blocks as committed list, _ := handle.GetValue("blockList") listMap := list.(map[int64]*blockInfo) for k := range listMap { listMap[k].committed = true } restaged := false for _, restageID := range restageIds { // We need to restage these blocks for i := range blockIDList { if blockIDList[i] == restageID { // Read one block from offset of this block, which shall effectively read this block and the next block // The stage this block again with correct length // Remove the next block from blockIDList // Commit the block list again block, err := bc.getOrCreateBlock(handle, uint64(i)*bc.blockSize) if err != nil { log.Err("BlockCache::commitBlocks : Failed to get block for %v [%v]", handle.Path, err.Error()) return err } block.Dirty() restaged = true // Next item after this block was a semi zero filler so remove that from the list now blockIDList = append(blockIDList[:i+1], blockIDList[i+2:]...) break } } } if restaged { // If any block was restaged then commit the blocks again return bc.commitBlocks(handle) } handle.Flags.Clear(handlemap.HandleFlagDirty) return nil } func (bc *BlockCache) getBlockIDList(handle *handlemap.Handle) ([]string, []string, error) { // generate the block id list order list, _ := handle.GetValue("blockList") listMap := list.(map[int64]*blockInfo) offsets := make([]int64, 0) blockIDList := make([]string, 0) for k := range listMap { offsets = append(offsets, k) } sort.Slice(offsets, func(i, j int) bool { return offsets[i] < offsets[j] }) zeroBlockStaged := false zeroBlockID := "" restageId := make([]string, 0) index := int64(0) i := 0 for i < len(offsets) { if index == offsets[i] { if i != len(offsets)-1 && listMap[offsets[i]].size != bc.blockSize { // A non last block was staged earlier and it is not of the same size as block size // This happens when a block which is not full is staged and at that moment it was the last block // Now we have written data beyond that point and its no longer the last block // In such case we need to fill the gap with zero blocks // For simplicity we will fill the gap with a new block and later merge both these blocks in one block id := common.GetBlockID(common.BlockIDLength) fillerSize := (bc.blockSize - listMap[offsets[i]].size) fillerOffset := uint64(offsets[i]*int64(bc.blockSize)) + listMap[offsets[i]].size log.Debug("BlockCache::getBlockIDList : Staging semi zero block for %v=>%v offset %v, size %v", handle.ID, handle.Path, fillerOffset, fillerSize) err := bc.NextComponent().StageData(internal.StageDataOptions{ Name: handle.Path, Data: bc.blockPool.zeroBlock.data[:fillerSize], Id: id, }) if err != nil { log.Err("BlockCache::getBlockIDList : Failed to write semi zero block for %v=>%v [%s]", handle.ID, handle.Path, err.Error()) return nil, nil, err } blockIDList = append(blockIDList, listMap[offsets[i]].id) log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, offsets[i], listMap[offsets[i]].id, listMap[offsets[i]].size) // After the flush call we need to merge this particular block with the next block (semi zero block) restageId = append(restageId, listMap[offsets[i]].id) // Add the semi zero block to the list blockIDList = append(blockIDList, id) log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, fillerOffset, id, fillerSize) index++ i++ } else { blockIDList = append(blockIDList, listMap[offsets[i]].id) log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, offsets[i], listMap[offsets[i]].id, listMap[offsets[i]].size) index++ i++ } } else { for index < offsets[i] { if !zeroBlockStaged { id, err := bc.stageZeroBlock(handle, 1) if err != nil { return nil, nil, err } zeroBlockStaged = true zeroBlockID = id } blockIDList = append(blockIDList, zeroBlockID) listMap[index] = &blockInfo{ id: zeroBlockID, committed: false, size: bc.blockPool.blockSize, } log.Debug("BlockCache::getBlockIDList : Adding zero block for %v=>%s, index %v", handle.ID, handle.Path, index) log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, zero block size %v)", handle.ID, handle.Path, index, zeroBlockID, bc.blockPool.blockSize) index++ } } } return blockIDList, restageId, nil } func (bc *BlockCache) stageZeroBlock(handle *handlemap.Handle, tryCnt int) (string, error) { if tryCnt > MAX_FAIL_CNT { // If we failed to write the data 3 times then just give up log.Err("BlockCache::stageZeroBlock : 3 attempts to upload zero block have failed %v=>%v", handle.ID, handle.Path) return "", fmt.Errorf("3 attempts to upload zero block have failed for %v=>%v", handle.ID, handle.Path) } id := common.GetBlockID(common.BlockIDLength) log.Debug("BlockCache::stageZeroBlock : Staging zero block for %v=>%v, try = %v", handle.ID, handle.Path, tryCnt) err := bc.NextComponent().StageData(internal.StageDataOptions{ Name: handle.Path, Data: bc.blockPool.zeroBlock.data[:], Id: id, }) if err != nil { log.Err("BlockCache::stageZeroBlock : Failed to write zero block for %v=>%v, try %v [%v]", handle.ID, handle.Path, tryCnt, err.Error()) return bc.stageZeroBlock(handle, tryCnt+1) } log.Debug("BlockCache::stageZeroBlock : Zero block id for %v=>%v = %v", handle.ID, handle.Path, id) return id, nil } // diskEvict : Callback when a node from disk expires func (bc *BlockCache) diskEvict(node *list.Element) { fileName := node.Value.(string) // If this block is already locked then return otherwise Lock() will hung up if bc.fileLocks.Locked(fileName) { log.Info("BlockCache::diskEvict : File %s is locked so skipping eviction", fileName) return } // Lock the file name so that its not downloaded when deletion is going on flock := bc.fileLocks.Get(fileName) flock.Lock() defer flock.Unlock() bc.fileNodeMap.Delete(fileName) localPath := filepath.Join(bc.tmpPath, fileName) _ = os.Remove(localPath) } // checkDiskUsage : Callback to check usage of disk and decide whether eviction is needed func (bc *BlockCache) checkDiskUsage() bool { data, _ := common.GetUsage(bc.tmpPath) usage := uint32((data * 100) / float64(bc.diskSize/_1MB)) if bc.maxDiskUsageHit { if usage >= MIN_POOL_USAGE { return true } bc.maxDiskUsageHit = false } else { if usage >= MAX_POOL_USAGE { bc.maxDiskUsageHit = true return true } } log.Info("BlockCache::checkDiskUsage : current disk usage : %fMB %v%%", data, usage) log.Info("BlockCache::checkDiskUsage : current cache usage : %v%%", bc.blockPool.Usage()) return false } // invalidateDirectory: Recursively invalidates a directory in the file cache. func (bc *BlockCache) invalidateDirectory(name string) { log.Trace("BlockCache::invalidateDirectory : %s", name) if bc.tmpPath == "" { return } localPath := filepath.Join(bc.tmpPath, name) _ = os.RemoveAll(localPath) } // DeleteDir: Recursively invalidate the directory and its children func (bc *BlockCache) DeleteDir(options internal.DeleteDirOptions) error { log.Trace("BlockCache::DeleteDir : %s", options.Name) err := bc.NextComponent().DeleteDir(options) if err != nil { log.Err("BlockCache::DeleteDir : %s failed", options.Name) return err } bc.invalidateDirectory(options.Name) return err } // RenameDir: Recursively invalidate the source directory and its children func (bc *BlockCache) RenameDir(options internal.RenameDirOptions) error { log.Trace("BlockCache::RenameDir : src=%s, dst=%s", options.Src, options.Dst) err := bc.NextComponent().RenameDir(options) if err != nil { log.Err("BlockCache::RenameDir : error %s [%s]", options.Src, err.Error()) return err } bc.invalidateDirectory(options.Src) return nil } // DeleteFile: Invalidate the file in local cache. func (bc *BlockCache) DeleteFile(options internal.DeleteFileOptions) error { log.Trace("BlockCache::DeleteFile : name=%s", options.Name) flock := bc.fileLocks.Get(options.Name) flock.Lock() defer flock.Unlock() err := bc.NextComponent().DeleteFile(options) if err != nil { log.Err("BlockCache::DeleteFile : error %s [%s]", options.Name, err.Error()) return err } localPath := filepath.Join(bc.tmpPath, options.Name) files, err := filepath.Glob(localPath + "*") if err == nil { for _, f := range files { if err := os.Remove(f); err != nil { break } } } return err } // RenameFile: Invalidate the file in local cache. func (bc *BlockCache) RenameFile(options internal.RenameFileOptions) error { log.Trace("BlockCache::RenameFile : src=%s, dst=%s", options.Src, options.Dst) sflock := bc.fileLocks.Get(options.Src) sflock.Lock() defer sflock.Unlock() dflock := bc.fileLocks.Get(options.Dst) dflock.Lock() defer dflock.Unlock() err := bc.NextComponent().RenameFile(options) if err != nil { log.Err("BlockCache::RenameFile : %s failed to rename file [%s]", options.Src, err.Error()) return err } localSrcPath := filepath.Join(bc.tmpPath, options.Src) localDstPath := filepath.Join(bc.tmpPath, options.Dst) files, err := filepath.Glob(localSrcPath + "*") if err == nil { for _, f := range files { err = os.Rename(f, strings.Replace(f, localSrcPath, localDstPath, 1)) if err != nil { break } } } return err } func (bc *BlockCache) SyncFile(options internal.SyncFileOptions) error { log.Trace("BlockCache::SyncFile : handle=%d, path=%s", options.Handle.ID, options.Handle.Path) err := bc.FlushFile(internal.FlushFileOptions{Handle: options.Handle, CloseInProgress: true}) //nolint if err != nil { log.Err("BlockCache::SyncFile : failed to flush file %s", options.Handle.Path) return err } return nil } func (bc *BlockCache) StatFs() (*syscall.Statfs_t, bool, error) { var maxCacheSize uint64 if bc.diskSize > 0 { maxCacheSize = bc.diskSize } else { maxCacheSize = bc.memSize } if maxCacheSize == 0 { return nil, false, nil } usage, _ := common.GetUsage(bc.tmpPath) usage = usage * float64(_1MB) available := (float64)(maxCacheSize) - usage statfs := &syscall.Statfs_t{} err := syscall.Statfs("/", statfs) if err != nil { log.Debug("BlockCache::StatFs : statfs err [%s].", err.Error()) return nil, false, err } statfs.Frsize = int64(bc.blockSize) statfs.Blocks = uint64(maxCacheSize) / uint64(bc.blockSize) statfs.Bavail = uint64(math.Max(0, available)) / uint64(bc.blockSize) statfs.Bfree = statfs.Bavail return statfs, true, nil } // ------------------------- Factory ------------------------------------------- // Pipeline will call this method to create your object, initialize your variables here // << DO NOT DELETE ANY AUTO GENERATED CODE HERE >> func NewBlockCacheComponent() internal.Component { comp := &BlockCache{ fileLocks: common.NewLockMap(), } comp.SetName(compName) return comp } // On init register this component to pipeline and supply your constructor func init() { internal.AddComponent(compName, NewBlockCacheComponent) blockSizeMb := config.AddFloat64Flag("block-cache-block-size", 0.0, "Size (in MB) of a block to be downloaded for block-cache.") config.BindPFlag(compName+".block-size-mb", blockSizeMb) blockPoolMb := config.AddUint64Flag("block-cache-pool-size", 0, "Size (in MB) of total memory preallocated for block-cache.") config.BindPFlag(compName+".mem-size-mb", blockPoolMb) blockCachePath := config.AddStringFlag("block-cache-path", "", "Path to store downloaded blocks.") config.BindPFlag(compName+".path", blockCachePath) blockDiskMb := config.AddUint64Flag("block-cache-disk-size", 0, "Size (in MB) of total disk capacity that block-cache can use.") config.BindPFlag(compName+".disk-size-mb", blockDiskMb) blockDiskTimeout := config.AddUint32Flag("block-cache-disk-timeout", 0, "Timeout (in seconds) for which persisted data remains in disk cache.") config.BindPFlag(compName+".disk-timeout-sec", blockDiskTimeout) blockCachePrefetch := config.AddUint32Flag("block-cache-prefetch", 0, "Max number of blocks to prefetch.") config.BindPFlag(compName+".prefetch", blockCachePrefetch) blockParallelism := config.AddUint32Flag("block-cache-parallelism", 128, "Number of worker thread responsible for upload/download jobs.") config.BindPFlag(compName+".parallelism", blockParallelism) blockCachePrefetchOnOpen := config.AddBoolFlag("block-cache-prefetch-on-open", false, "Start prefetching on open or wait for first read.") config.BindPFlag(compName+".prefetch-on-open", blockCachePrefetchOnOpen) strongConsistency := config.AddBoolFlag("block-cache-strong-consistency", false, "Enable strong data consistency for block cache.") config.BindPFlag(compName+".consistency", strongConsistency) }