component/azstorage/datalake.go (422 lines of code) (raw):

/* _____ _____ _____ ____ ______ _____ ------ | | | | | | | | | | | | | | | | | | | | | | | | | | | --- | | | | |-----| |---- | | |-----| |----- ------ | | | | | | | | | | | | | | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ Licensed under the MIT License <http://opensource.org/licenses/MIT>. Copyright © 2020-2025 Microsoft Corporation. All rights reserved. Author : <blobfusedev@microsoft.com> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE */ package azstorage import ( "context" "errors" "fmt" "net/url" "os" "path/filepath" "strings" "syscall" "github.com/Azure/azure-storage-fuse/v2/common" "github.com/Azure/azure-storage-fuse/v2/common/log" "github.com/Azure/azure-storage-fuse/v2/internal" "github.com/vibhansa-msft/blobfilter" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/directory" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/filesystem" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service" ) type Datalake struct { AzStorageConnection Auth azAuth Service *service.Client Filesystem *filesystem.Client BlockBlob BlockBlob datalakeCPKOpt *file.CPKInfo } // Verify that Datalake implements AzConnection interface var _ AzConnection = &Datalake{} // transformAccountEndpoint // Users must set an endpoint to allow blobfuse to // 1. support Azure clouds (ex: Public, Zonal DNS, China, Germany, Gov, etc) // 2. direct REST APIs to a truly custom endpoint (ex: www dot custom-domain dot com) // We can handle case 1 by simply replacing the .dfs. to .blob. and blobfuse will work fine. // However, case 2 will not work since the endpoint likely only redirects to the dfs endpoint and not the blob endpoint, so we don't know what endpoint to use when we call blob endpoints. // This is also a known problem with the SDKs. func transformAccountEndpoint(potentialDfsEndpoint string) string { if strings.Contains(potentialDfsEndpoint, ".dfs.") { return strings.Replace(potentialDfsEndpoint, ".dfs.", ".blob.", -1) } else { // Should we just throw here? log.Warn("Datalake::transformAccountEndpoint : Detected use of a custom endpoint. Not all operations are guaranteed to work.") } return potentialDfsEndpoint } // transformConfig transforms the adls config to a blob config func transformConfig(dlConfig AzStorageConfig) AzStorageConfig { bbConfig := dlConfig bbConfig.authConfig.AccountType = EAccountType.BLOCK() bbConfig.authConfig.Endpoint = transformAccountEndpoint(dlConfig.authConfig.Endpoint) return bbConfig } func (dl *Datalake) Configure(cfg AzStorageConfig) error { dl.Config = cfg if dl.Config.cpkEnabled { dl.datalakeCPKOpt = &file.CPKInfo{ EncryptionKey: &dl.Config.cpkEncryptionKey, EncryptionKeySHA256: &dl.Config.cpkEncryptionKeySha256, EncryptionAlgorithm: to.Ptr(directory.EncryptionAlgorithmTypeAES256), } } err := dl.BlockBlob.Configure(transformConfig(cfg)) // List call shall always retrieved permissions for HNS accounts dl.BlockBlob.listDetails.Permissions = true return err } // For dynamic config update the config here func (dl *Datalake) UpdateConfig(cfg AzStorageConfig) error { dl.Config.blockSize = cfg.blockSize dl.Config.maxConcurrency = cfg.maxConcurrency dl.Config.defaultTier = cfg.defaultTier dl.Config.ignoreAccessModifiers = cfg.ignoreAccessModifiers return dl.BlockBlob.UpdateConfig(cfg) } // UpdateServiceClient : Update the SAS specified by the user and create new service client func (dl *Datalake) UpdateServiceClient(key, value string) (err error) { if key == "saskey" { dl.Auth.setOption(key, value) // get the service client with updated SAS svcClient, err := dl.Auth.getServiceClient(&dl.Config) if err != nil { log.Err("Datalake::UpdateServiceClient : Failed to get service client [%s]", err.Error()) return err } // update the service client dl.Service = svcClient.(*service.Client) // Update the filesystem client dl.Filesystem = dl.Service.NewFileSystemClient(dl.Config.container) } return dl.BlockBlob.UpdateServiceClient(key, value) } // createServiceClient : Create the service client func (dl *Datalake) createServiceClient() (*service.Client, error) { log.Trace("Datalake::createServiceClient : Getting service client") dl.Auth = getAzAuth(dl.Config.authConfig) if dl.Auth == nil { log.Err("Datalake::createServiceClient : Failed to retrieve auth object") return nil, fmt.Errorf("failed to retrieve auth object") } svcClient, err := dl.Auth.getServiceClient(&dl.Config) if err != nil { log.Err("Datalake::createServiceClient : Failed to get service client [%s]", err.Error()) return nil, err } return svcClient.(*service.Client), nil } // SetupPipeline : Based on the config setup the ***URLs func (dl *Datalake) SetupPipeline() error { log.Trace("Datalake::SetupPipeline : Setting up") var err error // create the service client dl.Service, err = dl.createServiceClient() if err != nil { log.Err("Datalake::SetupPipeline : Failed to get service client [%s]", err.Error()) return err } // create the filesystem client dl.Filesystem = dl.Service.NewFileSystemClient(dl.Config.container) return dl.BlockBlob.SetupPipeline() } // TestPipeline : Validate the credentials specified in the auth config func (dl *Datalake) TestPipeline() error { log.Trace("Datalake::TestPipeline : Validating") if dl.Config.mountAllContainers { return nil } if dl.Filesystem == nil || dl.Filesystem.DFSURL() == "" || dl.Filesystem.BlobURL() == "" { log.Err("Datalake::TestPipeline : Filesystem Client is not built, check your credentials") return nil } maxResults := int32(2) listPathPager := dl.Filesystem.NewListPathsPager(false, &filesystem.ListPathsOptions{ MaxResults: &maxResults, Prefix: &dl.Config.prefixPath, }) // we are just validating the auth mode used. So, no need to iterate over the pages _, err := listPathPager.NextPage(context.Background()) if err != nil { log.Err("Datalake::TestPipeline : Failed to validate account with given auth %s", err.Error()) var respErr *azcore.ResponseError errors.As(err, &respErr) if respErr != nil { return fmt.Errorf("Datalake::TestPipeline : [%s]", respErr.ErrorCode) } return err } return dl.BlockBlob.TestPipeline() } // IsAccountADLS : Check account is ADLS or not func (dl *Datalake) IsAccountADLS() bool { return dl.BlockBlob.IsAccountADLS() } func (dl *Datalake) ListContainers() ([]string, error) { log.Trace("Datalake::ListContainers : Listing containers") return dl.BlockBlob.ListContainers() } func (dl *Datalake) SetPrefixPath(path string) error { log.Trace("Datalake::SetPrefixPath : path %s", path) dl.Config.prefixPath = path return dl.BlockBlob.SetPrefixPath(path) } // CreateFile : Create a new file in the filesystem/directory func (dl *Datalake) CreateFile(name string, mode os.FileMode) error { log.Trace("Datalake::CreateFile : name %s", name) err := dl.BlockBlob.CreateFile(name, mode) if err != nil { log.Err("Datalake::CreateFile : Failed to create file %s [%s]", name, err.Error()) return err } err = dl.ChangeMod(name, mode) if err != nil { log.Err("Datalake::CreateFile : Failed to set permissions on file %s [%s]", name, err.Error()) return err } return nil } // CreateDirectory : Create a new directory in the filesystem/directory func (dl *Datalake) CreateDirectory(name string) error { log.Trace("Datalake::CreateDirectory : name %s", name) directoryURL := dl.Filesystem.NewDirectoryClient(filepath.Join(dl.Config.prefixPath, name)) _, err := directoryURL.Create(context.Background(), &directory.CreateOptions{ CPKInfo: dl.datalakeCPKOpt, AccessConditions: &directory.AccessConditions{ ModifiedAccessConditions: &directory.ModifiedAccessConditions{ IfNoneMatch: to.Ptr(azcore.ETagAny), }, }, }) if err != nil { serr := storeDatalakeErrToErr(err) if serr == InvalidPermission { log.Err("Datalake::CreateDirectory : Insufficient permissions for %s [%s]", name, err.Error()) return syscall.EACCES } else if serr == ErrFileAlreadyExists { log.Err("Datalake::CreateDirectory : Path already exists for %s [%s]", name, err.Error()) return syscall.EEXIST } else { log.Err("Datalake::CreateDirectory : Failed to create directory %s [%s]", name, err.Error()) return err } } return nil } // CreateLink : Create a symlink in the filesystem/directory func (dl *Datalake) CreateLink(source string, target string) error { log.Trace("Datalake::CreateLink : %s -> %s", source, target) return dl.BlockBlob.CreateLink(source, target) } // DeleteFile : Delete a file in the filesystem/directory func (dl *Datalake) DeleteFile(name string) (err error) { log.Trace("Datalake::DeleteFile : name %s", name) fileClient := dl.Filesystem.NewFileClient(filepath.Join(dl.Config.prefixPath, name)) _, err = fileClient.Delete(context.Background(), nil) if err != nil { serr := storeDatalakeErrToErr(err) if serr == ErrFileNotFound { log.Err("Datalake::DeleteFile : %s does not exist", name) return syscall.ENOENT } else if serr == BlobIsUnderLease { log.Err("Datalake::DeleteFile : %s is under lease [%s]", name, err.Error()) return syscall.EIO } else if serr == InvalidPermission { log.Err("Datalake::DeleteFile : Insufficient permissions for %s [%s]", name, err.Error()) return syscall.EACCES } else { log.Err("Datalake::DeleteFile : Failed to delete file %s [%s]", name, err.Error()) return err } } return nil } // DeleteDirectory : Delete a directory in the filesystem/directory func (dl *Datalake) DeleteDirectory(name string) (err error) { log.Trace("Datalake::DeleteDirectory : name %s", name) directoryClient := dl.Filesystem.NewDirectoryClient(filepath.Join(dl.Config.prefixPath, name)) _, err = directoryClient.Delete(context.Background(), nil) // TODO : There is an ability to pass a continuation token here for recursive delete, should we implement this logic to follow continuation token? The SDK does not currently do this. if err != nil { serr := storeDatalakeErrToErr(err) if serr == ErrFileNotFound { log.Err("Datalake::DeleteDirectory : %s does not exist", name) return syscall.ENOENT } else { log.Err("Datalake::DeleteDirectory : Failed to delete directory %s [%s]", name, err.Error()) return err } } return nil } // RenameFile : Rename the file // While renaming the file, Creation time is preserved but LMT is changed for the destination blob. // and also Etag of the destination blob changes func (dl *Datalake) RenameFile(source string, target string, srcAttr *internal.ObjAttr) error { log.Trace("Datalake::RenameFile : %s -> %s", source, target) fileClient := dl.Filesystem.NewFileClient(url.PathEscape(filepath.Join(dl.Config.prefixPath, source))) renameResponse, err := fileClient.Rename(context.Background(), filepath.Join(dl.Config.prefixPath, target), &file.RenameOptions{ CPKInfo: dl.datalakeCPKOpt, }) if err != nil { serr := storeDatalakeErrToErr(err) if serr == ErrFileNotFound { log.Err("Datalake::RenameFile : %s does not exist", source) return syscall.ENOENT } else { log.Err("Datalake::RenameFile : Failed to rename file %s to %s [%s]", source, target, err.Error()) return err } } modifyLMTandEtag(srcAttr, renameResponse.LastModified, sanitizeEtag(renameResponse.ETag)) return nil } // RenameDirectory : Rename the directory func (dl *Datalake) RenameDirectory(source string, target string) error { log.Trace("Datalake::RenameDirectory : %s -> %s", source, target) directoryClient := dl.Filesystem.NewDirectoryClient(url.PathEscape(filepath.Join(dl.Config.prefixPath, source))) _, err := directoryClient.Rename(context.Background(), filepath.Join(dl.Config.prefixPath, target), &directory.RenameOptions{ CPKInfo: dl.datalakeCPKOpt, }) if err != nil { serr := storeDatalakeErrToErr(err) if serr == ErrFileNotFound { log.Err("Datalake::RenameDirectory : %s does not exist", source) return syscall.ENOENT } else { log.Err("Datalake::RenameDirectory : Failed to rename directory %s to %s [%s]", source, target, err.Error()) return err } } return nil } // GetAttr : Retrieve attributes of the path func (dl *Datalake) GetAttr(name string) (blobAttr *internal.ObjAttr, err error) { log.Trace("Datalake::GetAttr : name %s", name) fileClient := dl.Filesystem.NewFileClient(filepath.Join(dl.Config.prefixPath, name)) prop, err := fileClient.GetProperties(context.Background(), &file.GetPropertiesOptions{ CPKInfo: dl.datalakeCPKOpt, }) if err != nil { e := storeDatalakeErrToErr(err) if e == ErrFileNotFound { return blobAttr, syscall.ENOENT } else if e == InvalidPermission { log.Err("Datalake::GetAttr : Insufficient permissions for %s [%s]", name, err.Error()) return blobAttr, syscall.EACCES } else { log.Err("Datalake::GetAttr : Failed to get path properties for %s [%s]", name, err.Error()) return blobAttr, err } } mode, err := getFileMode(*prop.Permissions) if err != nil { log.Err("Datalake::GetAttr : Failed to get file mode for %s [%s]", name, err.Error()) return blobAttr, err } blobAttr = &internal.ObjAttr{ Path: name, Name: filepath.Base(name), Size: *prop.ContentLength, Mode: mode, Mtime: *prop.LastModified, Atime: *prop.LastModified, Ctime: *prop.LastModified, Crtime: *prop.LastModified, Flags: internal.NewFileBitMap(), ETag: sanitizeEtag(prop.ETag), } parseMetadata(blobAttr, prop.Metadata) if *prop.ResourceType == "directory" { blobAttr.Flags = internal.NewDirBitMap() blobAttr.Mode = blobAttr.Mode | os.ModeDir } if dl.Config.honourACL && dl.Config.authConfig.ObjectID != "" { acl, err := fileClient.GetAccessControl(context.Background(), nil) if err != nil { // Just ignore the error here as rest of the attributes have been retrieved log.Err("Datalake::GetAttr : Failed to get ACL for %s [%s]", name, err.Error()) } else { mode, err := getFileModeFromACL(dl.Config.authConfig.ObjectID, *acl.ACL, *acl.Owner) if err != nil { log.Err("Datalake::GetAttr : Failed to get file mode from ACL for %s [%s]", name, err.Error()) } else { blobAttr.Mode = mode } } } if dl.Config.filter != nil { if !dl.Config.filter.IsAcceptable(&blobfilter.BlobAttr{ Name: blobAttr.Name, Mtime: blobAttr.Mtime, Size: blobAttr.Size, }) { log.Debug("Datalake::GetAttr : Filtered out %s", name) return nil, syscall.ENOENT } } return blobAttr, nil } // List : Get a list of path matching the given prefix // This fetches the list using a marker so the caller code should handle marker logic // If count=0 - fetch max entries func (dl *Datalake) List(prefix string, marker *string, count int32) ([]*internal.ObjAttr, *string, error) { return dl.BlockBlob.List(prefix, marker, count) } // ReadToFile : Download a file to a local file func (dl *Datalake) ReadToFile(name string, offset int64, count int64, fi *os.File) (err error) { return dl.BlockBlob.ReadToFile(name, offset, count, fi) } // ReadBuffer : Download a specific range from a file to a buffer func (dl *Datalake) ReadBuffer(name string, offset int64, len int64) ([]byte, error) { return dl.BlockBlob.ReadBuffer(name, offset, len) } // ReadInBuffer : Download specific range from a file to a user provided buffer func (dl *Datalake) ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error { return dl.BlockBlob.ReadInBuffer(name, offset, len, data, etag) } // WriteFromFile : Upload local file to file func (dl *Datalake) WriteFromFile(name string, metadata map[string]*string, fi *os.File) (err error) { // File in DataLake may have permissions and ACL set. Just uploading the file will override them. // So, we need to get the existing permissions and ACL and set them back after uploading the file. var acl string = "" var fileClient *file.Client = nil if dl.Config.preserveACL { fileClient = dl.Filesystem.NewFileClient(filepath.Join(dl.Config.prefixPath, name)) resp, err := fileClient.GetAccessControl(context.Background(), nil) if err != nil { log.Err("Datalake::getACL : Failed to get ACLs for file %s [%s]", name, err.Error()) } else if resp.ACL != nil { acl = *resp.ACL } } // Upload the file, which will override the permissions and ACL retCode := dl.BlockBlob.WriteFromFile(name, metadata, fi) if acl != "" { // Cannot set both permissions and ACL in one call. ACL includes permission as well so just setting those back // Just setting up the permissions will delete existing ACLs applied on the blob so do not convert this code to // just set the permissions. _, err := fileClient.SetAccessControl(context.Background(), &file.SetAccessControlOptions{ ACL: &acl, }) if err != nil { // Earlier code was ignoring this so it might break customer cases where they do not have auth to update ACL log.Err("Datalake::WriteFromFile : Failed to set ACL for %s [%s]", name, err.Error()) } } return retCode } // WriteFromBuffer : Upload from a buffer to a file func (dl *Datalake) WriteFromBuffer(name string, metadata map[string]*string, data []byte) error { return dl.BlockBlob.WriteFromBuffer(name, metadata, data) } // Write : Write to a file at given offset func (dl *Datalake) Write(options internal.WriteFileOptions) error { return dl.BlockBlob.Write(options) } func (dl *Datalake) StageAndCommit(name string, bol *common.BlockOffsetList) error { return dl.BlockBlob.StageAndCommit(name, bol) } func (dl *Datalake) GetFileBlockOffsets(name string) (*common.BlockOffsetList, error) { return dl.BlockBlob.GetFileBlockOffsets(name) } func (dl *Datalake) TruncateFile(name string, size int64) error { return dl.BlockBlob.TruncateFile(name, size) } // ChangeMod : Change mode of a path func (dl *Datalake) ChangeMod(name string, mode os.FileMode) error { log.Trace("Datalake::ChangeMod : Change mode of file %s to %s", name, mode) fileClient := dl.Filesystem.NewFileClient(filepath.Join(dl.Config.prefixPath, name)) /* // If we need to call the ACL set api then we need to get older acl string here // and create new string with the username included in the string // Keeping this code here so in future if its required we can get the string and manipulate currPerm, err := fileURL.getACL(context.Background()) e := storeDatalakeErrToErr(err) if e == ErrFileNotFound { return syscall.ENOENT } else if err != nil { log.Err("Datalake::ChangeMod : Failed to get mode of file %s [%s]", name, err.Error()) return err } */ newPerm := getACLPermissions(mode) _, err := fileClient.SetAccessControl(context.Background(), &file.SetAccessControlOptions{ Permissions: &newPerm, }) if err != nil { log.Err("Datalake::ChangeMod : Failed to change mode of file %s to %s [%s]", name, mode, err.Error()) e := storeDatalakeErrToErr(err) if e == ErrFileNotFound { return syscall.ENOENT } else if e == InvalidPermission { return syscall.EACCES } else { return err } } return nil } // ChangeOwner : Change owner of a path func (dl *Datalake) ChangeOwner(name string, _ int, _ int) error { log.Trace("Datalake::ChangeOwner : name %s", name) if dl.Config.ignoreAccessModifiers { // for operations like git clone where transaction fails if chown is not successful // return success instead of ENOSYS return nil } // TODO: This is not supported for now. // fileURL := dl.Filesystem.NewRootDirectoryURL().NewFileURL(filepath.Join(dl.Config.prefixPath, name)) // group := strconv.Itoa(gid) // owner := strconv.Itoa(uid) // _, err := fileURL.SetAccessControl(context.Background(), azbfs.BlobFSAccessControl{Group: group, Owner: owner}) // e := storeDatalakeErrToErr(err) // if e == ErrFileNotFound { // return syscall.ENOENT // } else if err != nil { // log.Err("Datalake::ChangeOwner : Failed to change ownership of file %s to %s [%s]", name, mode, err.Error()) // return err // } return syscall.ENOTSUP } // GetCommittedBlockList : Get the list of committed blocks func (dl *Datalake) GetCommittedBlockList(name string) (*internal.CommittedBlockList, error) { return dl.BlockBlob.GetCommittedBlockList(name) } // StageBlock : stages a block and returns its blockid func (dl *Datalake) StageBlock(name string, data []byte, id string) error { return dl.BlockBlob.StageBlock(name, data, id) } // CommitBlocks : persists the block list func (dl *Datalake) CommitBlocks(name string, blockList []string, newEtag *string) error { return dl.BlockBlob.CommitBlocks(name, blockList, newEtag) } func (dl *Datalake) SetFilter(filter string) error { if filter == "" { dl.Config.filter = nil } else { dl.Config.filter = &blobfilter.BlobFilter{} err := dl.Config.filter.Configure(filter) if err != nil { return err } } return dl.BlockBlob.SetFilter(filter) }