component/azstorage/block_blob.go (1,259 lines of code) (raw):

/* _____ _____ _____ ____ ______ _____ ------ | | | | | | | | | | | | | | | | | | | | | | | | | | | --- | | | | |-----| |---- | | |-----| |----- ------ | | | | | | | | | | | | | | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ Licensed under the MIT License <http://opensource.org/licenses/MIT>. Copyright © 2020-2025 Microsoft Corporation. All rights reserved. Author : <blobfusedev@microsoft.com> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE */ package azstorage import ( "bytes" "context" "errors" "fmt" "io" "math" "os" "path/filepath" "reflect" "strings" "syscall" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" "github.com/Azure/azure-storage-fuse/v2/common" "github.com/Azure/azure-storage-fuse/v2/common/log" "github.com/Azure/azure-storage-fuse/v2/internal" "github.com/Azure/azure-storage-fuse/v2/internal/stats_manager" "github.com/vibhansa-msft/blobfilter" ) const ( folderKey = "hdi_isfolder" symlinkKey = "is_symlink" max_context_timeout = 5 ) type BlockBlob struct { AzStorageConnection Auth azAuth Service *service.Client Container *container.Client blobCPKOpt *blob.CPKInfo downloadOptions *blob.DownloadFileOptions listDetails container.ListBlobsInclude blockLocks common.KeyedMutex } // Verify that BlockBlob implements AzConnection interface var _ AzConnection = &BlockBlob{} const ( MaxBlobSize = blockblob.MaxStageBlockBytes * blockblob.MaxBlocks ) func (bb *BlockBlob) Configure(cfg AzStorageConfig) error { bb.Config = cfg if bb.Config.cpkEnabled { bb.blobCPKOpt = &blob.CPKInfo{ EncryptionKey: &bb.Config.cpkEncryptionKey, EncryptionKeySHA256: &bb.Config.cpkEncryptionKeySha256, EncryptionAlgorithm: to.Ptr(blob.EncryptionAlgorithmTypeAES256), } } bb.downloadOptions = &blob.DownloadFileOptions{ BlockSize: bb.Config.blockSize, Concurrency: bb.Config.maxConcurrency, CPKInfo: bb.blobCPKOpt, } bb.listDetails = container.ListBlobsInclude{ Metadata: true, Deleted: false, Snapshots: false, Permissions: false, //Added to get permissions, acl, group, owner for HNS accounts } return nil } // For dynamic config update the config here func (bb *BlockBlob) UpdateConfig(cfg AzStorageConfig) error { bb.Config.blockSize = cfg.blockSize bb.Config.maxConcurrency = cfg.maxConcurrency bb.Config.defaultTier = cfg.defaultTier bb.Config.ignoreAccessModifiers = cfg.ignoreAccessModifiers return nil } // UpdateServiceClient : Update the SAS specified by the user and create new service client func (bb *BlockBlob) UpdateServiceClient(key, value string) (err error) { if key == "saskey" { bb.Auth.setOption(key, value) // get the service client with updated SAS svcClient, err := bb.Auth.getServiceClient(&bb.Config) if err != nil { log.Err("BlockBlob::UpdateServiceClient : Failed to get service client [%s]", err.Error()) return err } // update the service client bb.Service = svcClient.(*service.Client) // Update the container client bb.Container = bb.Service.NewContainerClient(bb.Config.container) } return nil } // createServiceClient : Create the service client func (bb *BlockBlob) createServiceClient() (*service.Client, error) { log.Trace("BlockBlob::createServiceClient : Getting service client") bb.Auth = getAzAuth(bb.Config.authConfig) if bb.Auth == nil { log.Err("BlockBlob::createServiceClient : Failed to retrieve auth object") return nil, fmt.Errorf("failed to retrieve auth object") } svcClient, err := bb.Auth.getServiceClient(&bb.Config) if err != nil { log.Err("BlockBlob::createServiceClient : Failed to get service client [%s]", err.Error()) return nil, err } return svcClient.(*service.Client), nil } // SetupPipeline : Based on the config setup the ***URLs func (bb *BlockBlob) SetupPipeline() error { log.Trace("BlockBlob::SetupPipeline : Setting up") var err error // create the service client bb.Service, err = bb.createServiceClient() if err != nil { log.Err("BlockBlob::SetupPipeline : Failed to get service client [%s]", err.Error()) return err } // create the container client bb.Container = bb.Service.NewContainerClient(bb.Config.container) return nil } // TestPipeline : Validate the credentials specified in the auth config func (bb *BlockBlob) TestPipeline() error { log.Trace("BlockBlob::TestPipeline : Validating") if bb.Config.mountAllContainers { return nil } if bb.Container == nil || bb.Container.URL() == "" { log.Err("BlockBlob::TestPipeline : Container Client is not built, check your credentials") return nil } listBlobPager := bb.Container.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ MaxResults: to.Ptr((int32)(2)), Prefix: &bb.Config.prefixPath, }) // we are just validating the auth mode used. So, no need to iterate over the pages _, err := listBlobPager.NextPage(context.Background()) if err != nil { log.Err("BlockBlob::TestPipeline : Failed to validate account with given auth %s", err.Error()) var respErr *azcore.ResponseError errors.As(err, &respErr) if respErr != nil { return fmt.Errorf("BlockBlob::TestPipeline : [%s]", respErr.ErrorCode) } return err } return nil } // IsAccountADLS : Check account is ADLS or not func (bb *BlockBlob) IsAccountADLS() bool { includeFields := bb.listDetails includeFields.Permissions = true // for FNS account this property will return back error listBlobPager := bb.Container.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ MaxResults: to.Ptr((int32)(2)), Prefix: &bb.Config.prefixPath, Include: includeFields, }) // we are just validating the auth mode used. So, no need to iterate over the pages _, err := listBlobPager.NextPage(context.Background()) if err == nil { // Call will be successful only when we are able to retrieve the permissions // Permissions will work only in case of HNS accounts log.Crit("BlockBlob::IsAccountADLS : Detected HNS account") return true } var respErr *azcore.ResponseError errors.As(err, &respErr) if respErr != nil { if respErr.ErrorCode == "InvalidQueryParameterValue" { log.Crit("BlockBlob::IsAccountADLS : Detected FNS account") return false } } log.Crit("BlockBlob::IsAccountADLS : Unable to detect account type, assuming FNS [%s]", err.Error()) return false } func (bb *BlockBlob) ListContainers() ([]string, error) { log.Trace("BlockBlob::ListContainers : Listing containers") cntList := make([]string, 0) pager := bb.Service.NewListContainersPager(nil) for pager.More() { resp, err := pager.NextPage(context.Background()) if err != nil { log.Err("BlockBlob::ListContainers : Failed to get container list [%s]", err.Error()) return cntList, err } for _, v := range resp.ContainerItems { cntList = append(cntList, *v.Name) } } return cntList, nil } func (bb *BlockBlob) SetPrefixPath(path string) error { log.Trace("BlockBlob::SetPrefixPath : path %s", path) bb.Config.prefixPath = path return nil } // CreateFile : Create a new file in the container/virtual directory func (bb *BlockBlob) CreateFile(name string, mode os.FileMode) error { log.Trace("BlockBlob::CreateFile : name %s", name) var data []byte return bb.WriteFromBuffer(name, nil, data) } // CreateDirectory : Create a new directory in the container/virtual directory func (bb *BlockBlob) CreateDirectory(name string) error { log.Trace("BlockBlob::CreateDirectory : name %s", name) var data []byte metadata := make(map[string]*string) metadata[folderKey] = to.Ptr("true") return bb.WriteFromBuffer(name, metadata, data) } // CreateLink : Create a symlink in the container/virtual directory func (bb *BlockBlob) CreateLink(source string, target string) error { log.Trace("BlockBlob::CreateLink : %s -> %s", source, target) data := []byte(target) metadata := make(map[string]*string) metadata[symlinkKey] = to.Ptr("true") return bb.WriteFromBuffer(source, metadata, data) } // DeleteFile : Delete a blob in the container/virtual directory func (bb *BlockBlob) DeleteFile(name string) (err error) { log.Trace("BlockBlob::DeleteFile : name %s", name) blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name)) _, err = blobClient.Delete(context.Background(), &blob.DeleteOptions{ DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude), }) if err != nil { serr := storeBlobErrToErr(err) if serr == ErrFileNotFound { log.Err("BlockBlob::DeleteFile : %s does not exist", name) return syscall.ENOENT } else if serr == BlobIsUnderLease { log.Err("BlockBlob::DeleteFile : %s is under lease [%s]", name, err.Error()) return syscall.EIO } else { log.Err("BlockBlob::DeleteFile : Failed to delete blob %s [%s]", name, err.Error()) return err } } return nil } // DeleteDirectory : Delete a virtual directory in the container/virtual directory func (bb *BlockBlob) DeleteDirectory(name string) (err error) { log.Trace("BlockBlob::DeleteDirectory : name %s", name) err = bb.DeleteFile(name) // libfuse deletes the files in the directory before this method is called. // If the marker blob for directory is not present, ignore the ENOENT error. if err == syscall.ENOENT { err = nil } return err } // RenameFile : Rename the file // Source file must exist in storage account before calling this method. // When the rename is success, Data, metadata, of the blob will be copied to the destination. // Creation time and LMT is not preserved for copyBlob API. // Etag of the destination blob changes. // Copy the LMT to the src attr if the copy is success. // https://learn.microsoft.com/en-us/rest/api/storageservices/copy-blob?tabs=microsoft-entra-id func (bb *BlockBlob) RenameFile(source string, target string, srcAttr *internal.ObjAttr) error { log.Trace("BlockBlob::RenameFile : %s -> %s", source, target) blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, source)) newBlobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, target)) // not specifying source blob metadata, since passing empty metadata headers copies // the source blob metadata to destination blob copyResponse, err := newBlobClient.StartCopyFromURL(context.Background(), blobClient.URL(), &blob.StartCopyFromURLOptions{ Tier: bb.Config.defaultTier, }) if err != nil { serr := storeBlobErrToErr(err) if serr == ErrFileNotFound { //Ideally this case doesn't hit as we are checking for the existence of src //before making the call for RenameFile log.Err("BlockBlob::RenameFile : Src Blob doesn't Exist %s [%s]", source, err.Error()) return syscall.ENOENT } log.Err("BlockBlob::RenameFile : Failed to start copy of file %s [%s]", source, err.Error()) return err } var dstLMT *time.Time = copyResponse.LastModified var dstETag string = sanitizeEtag(copyResponse.ETag) copyStatus := copyResponse.CopyStatus var prop blob.GetPropertiesResponse pollCnt := 0 for copyStatus != nil && *copyStatus == blob.CopyStatusTypePending { time.Sleep(time.Second * 1) pollCnt++ prop, err = newBlobClient.GetProperties(context.Background(), &blob.GetPropertiesOptions{ CPKInfo: bb.blobCPKOpt, }) if err != nil { log.Err("BlockBlob::RenameFile : CopyStats : Failed to get blob properties for %s [%s]", source, err.Error()) } copyStatus = prop.CopyStatus } if pollCnt > 0 { dstLMT = prop.LastModified dstETag = sanitizeEtag(prop.ETag) } if copyStatus != nil && *copyStatus == blob.CopyStatusTypeSuccess { modifyLMTandEtag(srcAttr, dstLMT, dstETag) } log.Trace("BlockBlob::RenameFile : %s -> %s done", source, target) // Copy of the file is done so now delete the older file err = bb.DeleteFile(source) for retry := 0; retry < 3 && err == syscall.ENOENT; retry++ { // Sometimes backend is able to copy source file to destination but when we try to delete the // source files it returns back with ENOENT. If file was just created on backend it might happen // that it has not been synced yet at all layers and hence delete is not able to find the source file log.Trace("BlockBlob::RenameFile : %s -> %s, unable to find source. Retrying %d", source, target, retry) time.Sleep(1 * time.Second) err = bb.DeleteFile(source) } if err == syscall.ENOENT { // Even after 3 retries, 1 second apart if server returns 404 then source file no longer // exists on the backend and its safe to assume rename was successful err = nil } return err } // RenameDirectory : Rename the directory func (bb *BlockBlob) RenameDirectory(source string, target string) error { log.Trace("BlockBlob::RenameDirectory : %s -> %s", source, target) srcDirPresent := false pager := bb.Container.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ Prefix: to.Ptr(filepath.Join(bb.Config.prefixPath, source) + "/"), }) for pager.More() { listBlobResp, err := pager.NextPage(context.Background()) if err != nil { log.Err("BlockBlob::RenameDirectory : Failed to get list of blobs %s", err.Error()) return err } // Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute) for _, blobInfo := range listBlobResp.Segment.BlobItems { srcDirPresent = true srcPath := removePrefixPath(bb.Config.prefixPath, *blobInfo.Name) err = bb.RenameFile(srcPath, strings.Replace(srcPath, source, target, 1), nil) if err != nil { log.Err("BlockBlob::RenameDirectory : Failed to rename file %s [%s]", srcPath, err.Error) } } } // To rename source marker blob check its properties before calling rename on it. blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, source)) _, err := blobClient.GetProperties(context.Background(), &blob.GetPropertiesOptions{ CPKInfo: bb.blobCPKOpt, }) if err != nil { serr := storeBlobErrToErr(err) if serr == ErrFileNotFound { //marker blob doesn't exist for the directory if srcDirPresent { //Some files exist inside the directory return nil } log.Err("BlockBlob::RenameDirectory : %s marker blob does not exist and Src Directory doesn't Exist", source) return syscall.ENOENT } else { log.Err("BlockBlob::RenameDirectory : Failed to get source directory marker blob properties for %s [%s]", source, err.Error()) return err } } return bb.RenameFile(source, target, nil) } func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err error) { log.Trace("BlockBlob::getAttrUsingRest : name %s", name) blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name)) prop, err := blobClient.GetProperties(context.Background(), &blob.GetPropertiesOptions{ CPKInfo: bb.blobCPKOpt, }) if err != nil { serr := storeBlobErrToErr(err) if serr == ErrFileNotFound { return attr, syscall.ENOENT } else if serr == InvalidPermission { log.Err("BlockBlob::getAttrUsingRest : Insufficient permissions for %s [%s]", name, err.Error()) return attr, syscall.EACCES } else { log.Err("BlockBlob::getAttrUsingRest : Failed to get blob properties for %s [%s]", name, err.Error()) return attr, err } } // Since block blob does not support acls, we set mode to 0 and FlagModeDefault to true so the fuse layer can return the default permission. attr = &internal.ObjAttr{ Path: name, // We don't need to strip the prefixPath here since we pass the input name Name: filepath.Base(name), Size: *prop.ContentLength, Mode: 0, Mtime: *prop.LastModified, Atime: *prop.LastModified, Ctime: *prop.LastModified, Crtime: *prop.CreationTime, Flags: internal.NewFileBitMap(), MD5: prop.ContentMD5, ETag: sanitizeEtag(prop.ETag), } parseMetadata(attr, prop.Metadata) // We do not get permissions as part of this getAttr call hence setting the flag to true attr.Flags.Set(internal.PropFlagModeDefault) return attr, nil } func (bb *BlockBlob) getAttrUsingList(name string) (attr *internal.ObjAttr, err error) { log.Trace("BlockBlob::getAttrUsingList : name %s", name) iteration := 0 var marker, new_marker *string var blobs []*internal.ObjAttr blobsRead := 0 for marker != nil || iteration == 0 { blobs, new_marker, err = bb.List(name, marker, bb.Config.maxResultsForList) if err != nil { e := storeBlobErrToErr(err) if e == ErrFileNotFound { return attr, syscall.ENOENT } else if e == InvalidPermission { log.Err("BlockBlob::getAttrUsingList : Insufficient permissions for %s [%s]", name, err.Error()) return attr, syscall.EACCES } else { log.Warn("BlockBlob::getAttrUsingList : Failed to list blob properties for %s [%s]", name, err.Error()) } } for i, blob := range blobs { log.Trace("BlockBlob::getAttrUsingList : Item %d Blob %s", i+blobsRead, blob.Name) if blob.Path == name { return blob, nil } } marker = new_marker iteration++ blobsRead += len(blobs) log.Trace("BlockBlob::getAttrUsingList : So far retrieved %d objects in %d iterations", blobsRead, iteration) if new_marker == nil || *new_marker == "" { break } } if err == nil { log.Warn("BlockBlob::getAttrUsingList : blob %s does not exist", name) return nil, syscall.ENOENT } log.Err("BlockBlob::getAttrUsingList : Failed to list blob properties for %s [%s]", name, err.Error()) return nil, err } // GetAttr : Retrieve attributes of the blob func (bb *BlockBlob) GetAttr(name string) (attr *internal.ObjAttr, err error) { log.Trace("BlockBlob::GetAttr : name %s", name) // To support virtual directories with no marker blob, we call list instead of get properties since list will not return a 404 if bb.Config.virtualDirectory { attr, err = bb.getAttrUsingList(name) } else { attr, err = bb.getAttrUsingRest(name) } if bb.Config.filter != nil && attr != nil { if !bb.Config.filter.IsAcceptable(&blobfilter.BlobAttr{ Name: attr.Name, Mtime: attr.Mtime, Size: attr.Size, }) { log.Debug("BlockBlob::GetAttr : Filtered out %s", name) return nil, syscall.ENOENT } } return attr, err } // List : Get a list of blobs matching the given prefix // This fetches the list using a marker so the caller code should handle marker logic // If count=0 - fetch max entries func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*internal.ObjAttr, *string, error) { log.Trace("BlockBlob::List : prefix %s, marker %s", prefix, func(marker *string) string { if marker != nil { return *marker } else { return "" } }(marker)) if count == 0 { count = common.MaxDirListCount } listPath := bb.getListPath(prefix) // Get a result segment starting with the blob indicated by the current Marker. pager := bb.Container.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ Marker: marker, MaxResults: &count, Prefix: &listPath, Include: bb.listDetails, }) listBlob, err := pager.NextPage(context.Background()) // Note: Since we make a list call with a prefix, we will not fail here for a non-existent directory. // The blob service will not validate for us whether or not the path exists. // This is different from ADLS Gen2 behavior. // APIs that may be affected include IsDirEmpty, ReadDir and StreamDir if err != nil { log.Err("BlockBlob::List : Failed to list the container with the prefix %s", err.Error) return nil, nil, err } // Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute) // Since block blob does not support acls, we set mode to 0 and FlagModeDefault to true so the fuse layer can return the default permission. blobList, dirList, err := bb.processBlobItems(listBlob.Segment.BlobItems) if err != nil { return nil, nil, err } // In case virtual directory exists but its corresponding 0 byte marker file is not there holding hdi_isfolder then just iterating // over BlobItems will fail to identify that directory. In such cases BlobPrefixes help to list all directories // dirList contains all dirs for which we got 0 byte meta file in this iteration, so exclude those and add rest to the list // Note: Since listing is paginated, sometimes the marker file may come in a different iteration from the BlobPrefix. For such // cases we manually call GetAttr to check the existence of the marker file. err = bb.processBlobPrefixes(listBlob.Segment.BlobPrefixes, dirList, &blobList) if err != nil { return nil, nil, err } return blobList, listBlob.NextMarker, nil } func (bb *BlockBlob) getListPath(prefix string) string { listPath := filepath.Join(bb.Config.prefixPath, prefix) if (prefix != "" && prefix[len(prefix)-1] == '/') || (prefix == "" && bb.Config.prefixPath != "") { listPath += "/" } return listPath } func (bb *BlockBlob) processBlobItems(blobItems []*container.BlobItem) ([]*internal.ObjAttr, map[string]bool, error) { blobList := make([]*internal.ObjAttr, 0) // For some directories 0 byte meta file may not exists so just create a map to figure out such directories dirList := make(map[string]bool) filterAttr := blobfilter.BlobAttr{} for _, blobInfo := range blobItems { blobAttr, err := bb.getBlobAttr(blobInfo) if err != nil { return nil, nil, err } if blobAttr.IsDir() { // 0 byte meta found so mark this directory in map dirList[*blobInfo.Name+"/"] = true blobAttr.Size = 4096 } if bb.Config.filter != nil && !blobAttr.IsDir() { filterAttr.Name = blobAttr.Name filterAttr.Mtime = blobAttr.Mtime filterAttr.Size = blobAttr.Size if bb.Config.filter.IsAcceptable(&filterAttr) { blobList = append(blobList, blobAttr) } else { log.Debug("BlockBlob::List : Filtered out blob %s", blobAttr.Name) } } else { blobList = append(blobList, blobAttr) } } return blobList, dirList, nil } func (bb *BlockBlob) getBlobAttr(blobInfo *container.BlobItem) (*internal.ObjAttr, error) { if blobInfo.Properties.CustomerProvidedKeySHA256 != nil && *blobInfo.Properties.CustomerProvidedKeySHA256 != "" { log.Trace("BlockBlob::List : blob is encrypted with customer provided key so fetching metadata explicitly using REST") return bb.getAttrUsingRest(*blobInfo.Name) } mode, err := bb.getFileMode(blobInfo.Properties.Permissions) if err != nil { mode = 0 log.Warn("BlockBlob::getBlobAttr : Failed to get file mode for %s [%s]", *blobInfo.Name, err.Error()) } attr := &internal.ObjAttr{ Path: removePrefixPath(bb.Config.prefixPath, *blobInfo.Name), Name: filepath.Base(*blobInfo.Name), Size: *blobInfo.Properties.ContentLength, Mode: mode, Mtime: *blobInfo.Properties.LastModified, Atime: bb.dereferenceTime(blobInfo.Properties.LastAccessedOn, *blobInfo.Properties.LastModified), Ctime: *blobInfo.Properties.LastModified, Crtime: bb.dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified), Flags: internal.NewFileBitMap(), MD5: blobInfo.Properties.ContentMD5, ETag: sanitizeEtag(blobInfo.Properties.ETag), } parseMetadata(attr, blobInfo.Metadata) if !bb.listDetails.Permissions { // In case of HNS account do not set this flag attr.Flags.Set(internal.PropFlagModeDefault) } return attr, nil } func (bb *BlockBlob) getFileMode(permissions *string) (os.FileMode, error) { if permissions == nil { return 0, nil } return getFileMode(*permissions) } func (bb *BlockBlob) dereferenceTime(input *time.Time, defaultTime time.Time) time.Time { if input == nil { return defaultTime } return *input } func (bb *BlockBlob) processBlobPrefixes(blobPrefixes []*container.BlobPrefix, dirList map[string]bool, blobList *[]*internal.ObjAttr) error { for _, blobInfo := range blobPrefixes { if _, ok := dirList[*blobInfo.Name]; ok { // marker file found in current iteration, skip adding the directory continue } else { //Check to see if its a HNS account and we received properties in blob prefixes if bb.listDetails.Permissions { attr, err := bb.createDirAttrWithPermissions(blobInfo) if err != nil { return err } *blobList = append(*blobList, attr) } else { // marker file not found in current iteration, so we need to manually check attributes via REST _, err := bb.getAttrUsingRest(*blobInfo.Name) // marker file also not found via manual check, safe to add to list if err == syscall.ENOENT { attr := bb.createDirAttr(*blobInfo.Name) *blobList = append(*blobList, attr) } } } } // Clean up the temp map as its no more needed for k := range dirList { delete(dirList, k) } return nil } func (bb *BlockBlob) createDirAttr(name string) *internal.ObjAttr { // For these dirs we get only the name and no other properties so hardcoding time to current time name = strings.TrimSuffix(name, "/") attr := &internal.ObjAttr{ Path: removePrefixPath(bb.Config.prefixPath, name), Name: filepath.Base(name), Size: 4096, Mode: os.ModeDir, Mtime: time.Now(), Flags: internal.NewDirBitMap(), } attr.Atime = attr.Mtime attr.Crtime = attr.Mtime attr.Ctime = attr.Mtime // This is called only in case of FNS when blobPrefix is there but the marker does not exists attr.Flags.Set(internal.PropFlagModeDefault) return attr } func (bb *BlockBlob) createDirAttrWithPermissions(blobInfo *container.BlobPrefix) (*internal.ObjAttr, error) { if blobInfo.Properties == nil { return nil, fmt.Errorf("failed to get properties of blobprefix %s", *blobInfo.Name) } mode, err := bb.getFileMode(blobInfo.Properties.Permissions) if err != nil { mode = 0 log.Warn("BlockBlob::createDirAttrWithPermissions : Failed to get file mode for %s [%s]", *blobInfo.Name, err.Error()) } name := strings.TrimSuffix(*blobInfo.Name, "/") attr := &internal.ObjAttr{ Path: removePrefixPath(bb.Config.prefixPath, name), Name: filepath.Base(name), Size: *blobInfo.Properties.ContentLength, Mode: mode, Mtime: *blobInfo.Properties.LastModified, Atime: bb.dereferenceTime(blobInfo.Properties.LastAccessedOn, *blobInfo.Properties.LastModified), Ctime: *blobInfo.Properties.LastModified, Crtime: bb.dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified), Flags: internal.NewDirBitMap(), } return attr, nil } // track the progress of download of blobs where every 100MB of data downloaded is being tracked. It also tracks the completion of download func trackDownload(name string, bytesTransferred int64, count int64, downloadPtr *int64) { if bytesTransferred >= (*downloadPtr)*100*common.MbToBytes || bytesTransferred == count { (*downloadPtr)++ log.Debug("BlockBlob::trackDownload : Download: Blob = %v, Bytes transferred = %v, Size = %v", name, bytesTransferred, count) // send the download progress as an event azStatsCollector.PushEvents(downloadProgress, name, map[string]interface{}{bytesTfrd: bytesTransferred, size: count}) } } // ReadToFile : Download a blob to a local file func (bb *BlockBlob) ReadToFile(name string, offset int64, count int64, fi *os.File) (err error) { log.Trace("BlockBlob::ReadToFile : name %s, offset : %d, count %d", name, offset, count) //defer exectime.StatTimeCurrentBlock("BlockBlob::ReadToFile")() blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name)) downloadPtr := to.Ptr(int64(1)) if common.MonitorBfs() { bb.downloadOptions.Progress = func(bytesTransferred int64) { trackDownload(name, bytesTransferred, count, downloadPtr) } } defer log.TimeTrack(time.Now(), "BlockBlob::ReadToFile", name) dlOpts := *bb.downloadOptions dlOpts.Range = blob.HTTPRange{ Offset: offset, Count: count, } _, err = blobClient.DownloadFile(context.Background(), fi, &dlOpts) if err != nil { e := storeBlobErrToErr(err) if e == ErrFileNotFound { return syscall.ENOENT } else { log.Err("BlockBlob::ReadToFile : Failed to download blob %s [%s]", name, err.Error()) return err } } else { log.Debug("BlockBlob::ReadToFile : Download complete of blob %v", name) // store total bytes downloaded so far azStatsCollector.UpdateStats(stats_manager.Increment, bytesDownloaded, count) } if bb.Config.validateMD5 { // Compute md5 of local file fileMD5, err := common.GetMD5(fi) if err != nil { log.Warn("BlockBlob::ReadToFile : Failed to generate MD5 Sum for %s", name) } else { // Get latest properties from container to get the md5 of blob prop, err := blobClient.GetProperties(context.Background(), &blob.GetPropertiesOptions{ CPKInfo: bb.blobCPKOpt, }) if err != nil { log.Warn("BlockBlob::ReadToFile : Failed to get properties of blob %s [%s]", name, err.Error()) } else { blobMD5 := prop.ContentMD5 if blobMD5 == nil { log.Warn("BlockBlob::ReadToFile : Failed to get MD5 Sum for blob %s", name) } else { // compare md5 and fail is not match if !reflect.DeepEqual(fileMD5, blobMD5) { log.Err("BlockBlob::ReadToFile : MD5 Sum mismatch %s", name) return errors.New("md5 sum mismatch on download") } } } } } return nil } // ReadBuffer : Download a specific range from a blob to a buffer func (bb *BlockBlob) ReadBuffer(name string, offset int64, len int64) ([]byte, error) { log.Trace("BlockBlob::ReadBuffer : name %s, offset %v, len %v", name, offset, len) var buff []byte if len == 0 { attr, err := bb.GetAttr(name) if err != nil { return buff, err } len = attr.Size - offset } buff = make([]byte, len) blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name)) dlOpts := (blob.DownloadBufferOptions)(*bb.downloadOptions) dlOpts.Range = blob.HTTPRange{ Offset: offset, Count: len, } _, err := blobClient.DownloadBuffer(context.Background(), buff, &dlOpts) if err != nil { e := storeBlobErrToErr(err) if e == ErrFileNotFound { return buff, syscall.ENOENT } else if e == InvalidRange { return buff, syscall.ERANGE } log.Err("BlockBlob::ReadBuffer : Failed to download blob %s [%s]", name, err.Error()) return buff, err } return buff, nil } // ReadInBuffer : Download specific range from a file to a user provided buffer func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error { // log.Trace("BlockBlob::ReadInBuffer : name %s", name) if etag != nil { *etag = "" } blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name)) ctx, cancel := context.WithTimeout(context.Background(), max_context_timeout*time.Minute) defer cancel() opt := &blob.DownloadStreamOptions{ Range: blob.HTTPRange{ Offset: offset, Count: len, }, CPKInfo: bb.blobCPKOpt, } downloadResponse, err := blobClient.DownloadStream(ctx, opt) if err != nil { e := storeBlobErrToErr(err) if e == ErrFileNotFound { return syscall.ENOENT } else if e == InvalidRange { return syscall.ERANGE } log.Err("BlockBlob::ReadInBufferWithETag : Failed to download blob %s [%s]", name, err.Error()) return err } var streamBody io.ReadCloser = downloadResponse.NewRetryReader(ctx, nil) dataRead, err := io.ReadFull(streamBody, data) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { log.Err("BlockBlob::ReadInBuffer : Failed to copy data from body to buffer for blob %s [%s]", name, err.Error()) return err } if dataRead < 0 { log.Err("BlockBlob::ReadInBuffer : Failed to copy data from body to buffer for blob %s", name) return errors.New("failed to copy data from body to buffer") } err = streamBody.Close() if err != nil { log.Err("BlockBlob::ReadInBuffer : Failed to close body for blob %s [%s]", name, err.Error()) } if etag != nil { *etag = sanitizeEtag(downloadResponse.ETag) } return nil } func (bb *BlockBlob) calculateBlockSize(name string, fileSize int64) (blockSize int64, err error) { // If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error if fileSize > MaxBlobSize { log.Err("BlockBlob::calculateBlockSize : buffer is too large to upload to a block blob %s", name) err = errors.New("buffer is too large to upload to a block blob") return 0, err } // If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request if fileSize <= blockblob.MaxUploadBlobBytes { // Files up to 256MB can be uploaded as a single block blockSize = blockblob.MaxUploadBlobBytes } else { // buffer / max blocks = block size to use all 50,000 blocks blockSize = int64(math.Ceil(float64(fileSize) / blockblob.MaxBlocks)) if blockSize < blob.DefaultDownloadBlockSize { // Block size is smaller then 4MB then consider 4MB as default blockSize = blob.DefaultDownloadBlockSize } else { if (blockSize & (-8)) != 0 { // EXTRA : round off the block size to next higher multiple of 8. // No reason to do so just the odd numbers in block size will not be good on server end is assumption blockSize = (blockSize + 7) & (-8) } if blockSize > blockblob.MaxStageBlockBytes { // After rounding off the blockSize has become bigger then max allowed blocks. log.Err("BlockBlob::calculateBlockSize : blockSize exceeds max allowed block size for %s", name) err = errors.New("block-size is too large to upload to a block blob") return 0, err } } } log.Info("BlockBlob::calculateBlockSize : %s size %v, blockSize %v", name, fileSize, blockSize) return blockSize, nil } // track the progress of upload of blobs where every 100MB of data uploaded is being tracked. It also tracks the completion of upload func trackUpload(name string, bytesTransferred int64, count int64, uploadPtr *int64) { if bytesTransferred >= (*uploadPtr)*100*common.MbToBytes || bytesTransferred == count { (*uploadPtr)++ log.Debug("BlockBlob::trackUpload : Upload: Blob = %v, Bytes transferred = %v, Size = %v", name, bytesTransferred, count) // send upload progress as event azStatsCollector.PushEvents(uploadProgress, name, map[string]interface{}{bytesTfrd: bytesTransferred, size: count}) } } // WriteFromFile : Upload local file to blob func (bb *BlockBlob) WriteFromFile(name string, metadata map[string]*string, fi *os.File) (err error) { log.Trace("BlockBlob::WriteFromFile : name %s", name) //defer exectime.StatTimeCurrentBlock("WriteFromFile::WriteFromFile")() blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name)) defer log.TimeTrack(time.Now(), "BlockBlob::WriteFromFile", name) uploadPtr := to.Ptr(int64(1)) blockSize := bb.Config.blockSize // get the size of the file stat, err := fi.Stat() if err != nil { log.Err("BlockBlob::WriteFromFile : Failed to get file size %s [%s]", name, err.Error()) return err } // if the block size is not set then we configure it based on file size if blockSize == 0 { // based on file-size calculate block size blockSize, err = bb.calculateBlockSize(name, stat.Size()) if err != nil { return err } } // Compute md5 of this file is requested by user // If file is uploaded in one shot (no blocks created) then server is populating md5 on upload automatically. // hence we take cost of calculating md5 only for files which are bigger in size and which will be converted to blocks. md5sum := []byte{} if bb.Config.updateMD5 && stat.Size() >= blockblob.MaxUploadBlobBytes { md5sum, err = common.GetMD5(fi) if err != nil { // Md5 sum generation failed so set nil while uploading log.Warn("BlockBlob::WriteFromFile : Failed to generate md5 of %s", name) md5sum = []byte{0} } } uploadOptions := &blockblob.UploadFileOptions{ BlockSize: blockSize, Concurrency: bb.Config.maxConcurrency, Metadata: metadata, AccessTier: bb.Config.defaultTier, HTTPHeaders: &blob.HTTPHeaders{ BlobContentType: to.Ptr(getContentType(name)), BlobContentMD5: md5sum, }, CPKInfo: bb.blobCPKOpt, } if common.MonitorBfs() && stat.Size() > 0 { uploadOptions.Progress = func(bytesTransferred int64) { trackUpload(name, bytesTransferred, stat.Size(), uploadPtr) } } _, err = blobClient.UploadFile(context.Background(), fi, uploadOptions) if err != nil { serr := storeBlobErrToErr(err) if serr == BlobIsUnderLease { log.Err("BlockBlob::WriteFromFile : %s is under a lease, can not update file [%s]", name, err.Error()) return syscall.EIO } else if serr == InvalidPermission { log.Err("BlockBlob::WriteFromFile : Insufficient permissions for %s [%s]", name, err.Error()) return syscall.EACCES } else { log.Err("BlockBlob::WriteFromFile : Failed to upload blob %s [%s]", name, err.Error()) } return err } else { log.Debug("BlockBlob::WriteFromFile : Upload complete of blob %v", name) // store total bytes uploaded so far if stat.Size() > 0 { azStatsCollector.UpdateStats(stats_manager.Increment, bytesUploaded, stat.Size()) } } return nil } // WriteFromBuffer : Upload from a buffer to a blob func (bb *BlockBlob) WriteFromBuffer(name string, metadata map[string]*string, data []byte) error { log.Trace("BlockBlob::WriteFromBuffer : name %s", name) blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name)) defer log.TimeTrack(time.Now(), "BlockBlob::WriteFromBuffer", name) _, err := blobClient.UploadBuffer(context.Background(), data, &blockblob.UploadBufferOptions{ BlockSize: bb.Config.blockSize, Concurrency: bb.Config.maxConcurrency, Metadata: metadata, AccessTier: bb.Config.defaultTier, HTTPHeaders: &blob.HTTPHeaders{ BlobContentType: to.Ptr(getContentType(name)), }, CPKInfo: bb.blobCPKOpt, }) if err != nil { log.Err("BlockBlob::WriteFromBuffer : Failed to upload blob %s [%s]", name, err.Error()) return err } return nil } // GetFileBlockOffsets: store blocks ids and corresponding offsets func (bb *BlockBlob) GetFileBlockOffsets(name string) (*common.BlockOffsetList, error) { var blockOffset int64 = 0 blockList := common.BlockOffsetList{} blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name)) storageBlockList, err := blobClient.GetBlockList(context.Background(), blockblob.BlockListTypeCommitted, nil) if err != nil { log.Err("BlockBlob::GetFileBlockOffsets : Failed to get block list %s ", name, err.Error()) return &common.BlockOffsetList{}, err } // if block list empty its a small file if len(storageBlockList.CommittedBlocks) == 0 { blockList.Flags.Set(common.SmallFile) return &blockList, nil } for _, block := range storageBlockList.CommittedBlocks { blk := &common.Block{ Id: *block.Name, StartIndex: int64(blockOffset), EndIndex: int64(blockOffset) + *block.Size, } blockOffset += *block.Size blockList.BlockList = append(blockList.BlockList, blk) } // blockList.Etag = storageBlockList.ETag() blockList.BlockIdLength = common.GetIdLength(blockList.BlockList[0].Id) return &blockList, nil } func (bb *BlockBlob) createBlock(blockIdLength, startIndex, size int64) *common.Block { newBlockId := common.GetBlockID(blockIdLength) newBlock := &common.Block{ Id: newBlockId, StartIndex: startIndex, EndIndex: startIndex + size, } // mark truncated since it is a new empty block newBlock.Flags.Set(common.TruncatedBlock) newBlock.Flags.Set(common.DirtyBlock) return newBlock } // create new blocks based on the offset and total length we're adding to the file func (bb *BlockBlob) createNewBlocks(blockList *common.BlockOffsetList, offset, length int64) (int64, error) { blockSize := bb.Config.blockSize prevIndex := blockList.BlockList[len(blockList.BlockList)-1].EndIndex numOfBlocks := int64(len(blockList.BlockList)) if blockSize == 0 { blockSize = (16 * 1024 * 1024) if math.Ceil((float64)(numOfBlocks)+(float64)(length)/(float64)(blockSize)) > blockblob.MaxBlocks { blockSize = int64(math.Ceil((float64)(length) / (float64)(blockblob.MaxBlocks-numOfBlocks))) if blockSize > blockblob.MaxStageBlockBytes { return 0, errors.New("cannot accommodate data within the block limit") } } } else if math.Ceil((float64)(numOfBlocks)+(float64)(length)/(float64)(blockSize)) > blockblob.MaxBlocks { return 0, errors.New("cannot accommodate data within the block limit with configured block-size") } // BufferSize is the size of the buffer that will go beyond our current blob (appended) var bufferSize int64 for i := prevIndex; i < offset+length; i += blockSize { blkSize := int64(math.Min(float64(blockSize), float64((offset+length)-i))) newBlock := bb.createBlock(blockList.BlockIdLength, i, blkSize) blockList.BlockList = append(blockList.BlockList, newBlock) // reset the counter to determine if there are leftovers at the end bufferSize += blkSize } return bufferSize, nil } func (bb *BlockBlob) removeBlocks(blockList *common.BlockOffsetList, size int64, name string) *common.BlockOffsetList { _, index := blockList.BinarySearch(size) // if the start index is equal to new size - block should be removed - move one index back if blockList.BlockList[index].StartIndex == size { index = index - 1 } // if the file we're shrinking is in the middle of a block then shrink that block if blockList.BlockList[index].EndIndex > size { blk := blockList.BlockList[index] blk.EndIndex = size blk.Data = make([]byte, blk.EndIndex-blk.StartIndex) blk.Flags.Set(common.DirtyBlock) err := bb.ReadInBuffer(name, blk.StartIndex, blk.EndIndex-blk.StartIndex, blk.Data, nil) if err != nil { log.Err("BlockBlob::removeBlocks : Failed to remove blocks %s [%s]", name, err.Error()) } } blk := blockList.BlockList[index] blk.Flags.Set(common.RemovedBlocks) blockList.BlockList = blockList.BlockList[:index+1] return blockList } func (bb *BlockBlob) TruncateFile(name string, size int64) error { // log.Trace("BlockBlob::TruncateFile : name=%s, size=%d", name, size) attr, err := bb.GetAttr(name) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to get attributes of file %s [%s]", name, err.Error()) if err == syscall.ENOENT { return err } } if size == 0 || attr.Size == 0 { // If we are resizing to a value > 1GB then we need to upload multiple blocks to resize if size > 1*common.GbToBytes { blkSize := int64(16 * common.MbToBytes) blobName := filepath.Join(bb.Config.prefixPath, name) blobClient := bb.Container.NewBlockBlobClient(blobName) blkList := make([]string, 0) id := common.GetBlockID(common.BlockIDLength) for i := 0; size > 0; i++ { if i == 0 || size < blkSize { // Only first and last block we upload and rest all we replicate with the first block itself if size < blkSize { blkSize = size id = common.GetBlockID(common.BlockIDLength) } data := make([]byte, blkSize) _, err = blobClient.StageBlock(context.Background(), id, streaming.NopCloser(bytes.NewReader(data)), &blockblob.StageBlockOptions{ CPKInfo: bb.blobCPKOpt, }) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to stage block for %s [%s]", name, err.Error()) return err } } blkList = append(blkList, id) size -= blkSize } err = bb.CommitBlocks(blobName, blkList, nil) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to commit blocks for %s [%s]", name, err.Error()) return err } } else { err := bb.WriteFromBuffer(name, nil, make([]byte, size)) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to set the %s to 0 bytes [%s]", name, err.Error()) } } return err } //If new size is less than 256MB if size < blockblob.MaxUploadBlobBytes { data, err := bb.HandleSmallFile(name, size, attr.Size) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error()) return err } err = bb.WriteFromBuffer(name, nil, data) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to write from buffer file %s", name, err.Error()) return err } } else { bol, err := bb.GetFileBlockOffsets(name) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to get block list of file %s [%s]", name, err.Error()) return err } if bol.SmallFile() { data, err := bb.HandleSmallFile(name, size, attr.Size) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error()) return err } err = bb.WriteFromBuffer(name, nil, data) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to write from buffer file %s", name, err.Error()) return err } } else { if size < attr.Size { bol = bb.removeBlocks(bol, size, name) } else if size > attr.Size { _, err = bb.createNewBlocks(bol, bol.BlockList[len(bol.BlockList)-1].EndIndex, size-attr.Size) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to create new blocks for file %s", name, err.Error()) return err } } err = bb.StageAndCommit(name, bol) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to stage and commit file %s", name, err.Error()) return err } } } return nil } func (bb *BlockBlob) HandleSmallFile(name string, size int64, originalSize int64) ([]byte, error) { var data = make([]byte, size) var err error if size > originalSize { err = bb.ReadInBuffer(name, 0, 0, data, nil) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error()) } } else { err = bb.ReadInBuffer(name, 0, size, data, nil) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error()) } } return data, err } // Write : write data at given offset to a blob func (bb *BlockBlob) Write(options internal.WriteFileOptions) error { name := options.Handle.Path offset := options.Offset defer log.TimeTrack(time.Now(), "BlockBlob::Write", options.Handle.Path) log.Trace("BlockBlob::Write : name %s offset %v", name, offset) // tracks the case where our offset is great than our current file size (appending only - not modifying pre-existing data) var dataBuffer *[]byte // when the file offset mapping is cached we don't need to make a get block list call fileOffsets, err := bb.GetFileBlockOffsets(name) if err != nil { return err } length := int64(len(options.Data)) data := options.Data // case 1: file consists of no blocks (small file) if fileOffsets.SmallFile() { // get all the data oldData, _ := bb.ReadBuffer(name, 0, 0) // update the data with the new data // if we're only overwriting existing data if int64(len(oldData)) >= offset+length { copy(oldData[offset:], data) dataBuffer = &oldData // else appending and/or overwriting } else { // if the file is not empty then we need to combine the data if len(oldData) > 0 { // new data buffer with the size of old and new data newDataBuffer := make([]byte, offset+length) // copy the old data into it // TODO: better way to do this? if offset != 0 { copy(newDataBuffer, oldData) oldData = nil } // overwrite with the new data we want to add copy(newDataBuffer[offset:], data) dataBuffer = &newDataBuffer } else { dataBuffer = &data } } // WriteFromBuffer should be able to handle the case where now the block is too big and gets split into multiple blocks err := bb.WriteFromBuffer(name, options.Metadata, *dataBuffer) if err != nil { log.Err("BlockBlob::Write : Failed to upload to blob %s ", name, err.Error()) return err } // case 2: given offset is within the size of the blob - and the blob consists of multiple blocks // case 3: new blocks need to be added } else { index, oldDataSize, exceedsFileBlocks, appendOnly := fileOffsets.FindBlocksToModify(offset, length) // keeps track of how much new data will be appended to the end of the file (applicable only to case 3) newBufferSize := int64(0) // case 3? if exceedsFileBlocks { newBufferSize, err = bb.createNewBlocks(fileOffsets, offset, length) if err != nil { log.Err("BlockBlob::Write : Failed to create new blocks for file %s", name, err.Error()) return err } } // buffer that holds that pre-existing data in those blocks we're interested in oldDataBuffer := make([]byte, oldDataSize+newBufferSize) if !appendOnly { // fetch the blocks that will be impacted by the new changes so we can overwrite them err = bb.ReadInBuffer(name, fileOffsets.BlockList[index].StartIndex, oldDataSize, oldDataBuffer, nil) if err != nil { log.Err("BlockBlob::Write : Failed to read data in buffer %s [%s]", name, err.Error()) } } // this gives us where the offset with respect to the buffer that holds our old data - so we can start writing the new data blockOffset := offset - fileOffsets.BlockList[index].StartIndex copy(oldDataBuffer[blockOffset:], data) err := bb.stageAndCommitModifiedBlocks(name, oldDataBuffer, fileOffsets) return err } return nil } // TODO: make a similar method facing stream that would enable us to write to cached blocks then stage and commit func (bb *BlockBlob) stageAndCommitModifiedBlocks(name string, data []byte, offsetList *common.BlockOffsetList) error { blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name)) blockOffset := int64(0) var blockIDList []string for _, blk := range offsetList.BlockList { blockIDList = append(blockIDList, blk.Id) if blk.Dirty() { _, err := blobClient.StageBlock(context.Background(), blk.Id, streaming.NopCloser(bytes.NewReader(data[blockOffset:(blk.EndIndex-blk.StartIndex)+blockOffset])), &blockblob.StageBlockOptions{ CPKInfo: bb.blobCPKOpt, }) if err != nil { log.Err("BlockBlob::stageAndCommitModifiedBlocks : Failed to stage to blob %s at block %v [%s]", name, blockOffset, err.Error()) return err } blockOffset = (blk.EndIndex - blk.StartIndex) + blockOffset } } _, err := blobClient.CommitBlockList(context.Background(), blockIDList, &blockblob.CommitBlockListOptions{ HTTPHeaders: &blob.HTTPHeaders{ BlobContentType: to.Ptr(getContentType(name)), }, Tier: bb.Config.defaultTier, CPKInfo: bb.blobCPKOpt, }) if err != nil { log.Err("BlockBlob::stageAndCommitModifiedBlocks : Failed to commit block list to blob %s [%s]", name, err.Error()) return err } return nil } func (bb *BlockBlob) StageAndCommit(name string, bol *common.BlockOffsetList) error { // lock on the blob name so that no stage and commit race condition occur causing failure blobMtx := bb.blockLocks.GetLock(name) blobMtx.Lock() defer blobMtx.Unlock() blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name)) var blockIDList []string var data []byte staged := false for _, blk := range bol.BlockList { blockIDList = append(blockIDList, blk.Id) if blk.Truncated() { data = make([]byte, blk.EndIndex-blk.StartIndex) blk.Flags.Clear(common.TruncatedBlock) } else { data = blk.Data } if blk.Dirty() { _, err := blobClient.StageBlock(context.Background(), blk.Id, streaming.NopCloser(bytes.NewReader(data)), &blockblob.StageBlockOptions{ CPKInfo: bb.blobCPKOpt, }) if err != nil { log.Err("BlockBlob::StageAndCommit : Failed to stage to blob %s with ID %s at block %v [%s]", name, blk.Id, blk.StartIndex, err.Error()) return err } staged = true blk.Flags.Clear(common.DirtyBlock) } else if blk.Removed() { staged = true } } if staged { _, err := blobClient.CommitBlockList(context.Background(), blockIDList, &blockblob.CommitBlockListOptions{ HTTPHeaders: &blob.HTTPHeaders{ BlobContentType: to.Ptr(getContentType(name)), }, Tier: bb.Config.defaultTier, CPKInfo: bb.blobCPKOpt, // AccessConditions: &blob.AccessConditions{ModifiedAccessConditions: &blob.ModifiedAccessConditions{IfMatch: bol.Etag}}, }) if err != nil { log.Err("BlockBlob::StageAndCommit : Failed to commit block list to blob %s [%s]", name, err.Error()) return err } // update the etag // bol.Etag = resp.ETag() } return nil } // ChangeMod : Change mode of a blob func (bb *BlockBlob) ChangeMod(name string, _ os.FileMode) error { log.Trace("BlockBlob::ChangeMod : name %s", name) if bb.Config.ignoreAccessModifiers { // for operations like git clone where transaction fails if chmod is not successful // return success instead of ENOSYS return nil } // This is not currently supported for a flat namespace account return syscall.ENOTSUP } // ChangeOwner : Change owner of a blob func (bb *BlockBlob) ChangeOwner(name string, _ int, _ int) error { log.Trace("BlockBlob::ChangeOwner : name %s", name) if bb.Config.ignoreAccessModifiers { // for operations like git clone where transaction fails if chown is not successful // return success instead of ENOSYS return nil } // This is not currently supported for a flat namespace account return syscall.ENOTSUP } // GetCommittedBlockList : Get the list of committed blocks func (bb *BlockBlob) GetCommittedBlockList(name string) (*internal.CommittedBlockList, error) { blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name)) storageBlockList, err := blobClient.GetBlockList(context.Background(), blockblob.BlockListTypeCommitted, nil) if err != nil { log.Err("BlockBlob::GetFileBlockOffsets : Failed to get block list %s ", name, err.Error()) return nil, err } // if block list empty its a small file if len(storageBlockList.CommittedBlocks) == 0 { return nil, nil } blockList := make(internal.CommittedBlockList, 0) startOffset := int64(0) for _, block := range storageBlockList.CommittedBlocks { blk := internal.CommittedBlock{ Id: *block.Name, Offset: startOffset, Size: uint64(*block.Size), } startOffset += *block.Size blockList = append(blockList, blk) } return &blockList, nil } // StageBlock : stages a block and returns its blockid func (bb *BlockBlob) StageBlock(name string, data []byte, id string) error { log.Trace("BlockBlob::StageBlock : name %s, ID %v, length %v", name, id, len(data)) ctx, cancel := context.WithTimeout(context.Background(), max_context_timeout*time.Minute) defer cancel() blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name)) _, err := blobClient.StageBlock(ctx, id, streaming.NopCloser(bytes.NewReader(data)), &blockblob.StageBlockOptions{ CPKInfo: bb.blobCPKOpt, }) if err != nil { log.Err("BlockBlob::StageBlock : Failed to stage to blob %s with ID %s [%s]", name, id, err.Error()) return err } return nil } // CommitBlocks : persists the block list func (bb *BlockBlob) CommitBlocks(name string, blockList []string, newEtag *string) error { log.Trace("BlockBlob::CommitBlocks : name %s", name) ctx, cancel := context.WithTimeout(context.Background(), max_context_timeout*time.Minute) defer cancel() blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name)) resp, err := blobClient.CommitBlockList(ctx, blockList, &blockblob.CommitBlockListOptions{ HTTPHeaders: &blob.HTTPHeaders{ BlobContentType: to.Ptr(getContentType(name)), }, Tier: bb.Config.defaultTier, CPKInfo: bb.blobCPKOpt, }) if err != nil { log.Err("BlockBlob::CommitBlocks : Failed to commit block list to blob %s [%s]", name, err.Error()) return err } if newEtag != nil { *newEtag = sanitizeEtag(resp.ETag) } return nil } func (bb *BlockBlob) SetFilter(filter string) error { if filter == "" { bb.Config.filter = nil return nil } bb.Config.filter = &blobfilter.BlobFilter{} return bb.Config.filter.Configure(filter) }